Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-5909

Flink115SqlInterpreter throws IOException on successful insert operation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 0.11.0
    • None
    • zeppelin-interpreter
    • None
    • Patch

    Description

      There is an existing race condition in Flink-1.15 and above that breaks code below:

      TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
          checkState(tableResult.getJobClient().isPresent());
          try {
            tableResult.await();
            JobClient jobClient = tableResult.getJobClient().get();
            if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
              context.out.write("Insertion successfully.\n");
            } else {
              throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
            }
          } catch (InterruptedException e) {
            throw new IOException("Flink job is interrupted", e);
          } catch (ExecutionException e) {
            throw new IOException("Flink job is failed", e);
          } 

      jobClient.getJobStatus() can return status RUNNIG for some short period of time when job result already available which leads to IOException thrown from zeppelin interpreter:

      Job is failed, Program execution finished

       
       
      Example of working patch:

      private void waitForJobStatusFinishedOrThrow(
              final JobClient jobClient,
              final long timeoutMillis,
              final long retryIntervalMillis)
              throws IOException, InterruptedException, ExecutionException {
        final long startTime = System.currentTimeMillis();
        JobStatus status = jobClient.getJobStatus().get();
        while (status.equals(JobStatus.RUNNING) && System.currentTimeMillis() - startTime < timeoutMillis) {
          Thread.sleep(retryIntervalMillis);
          status = jobClient.getJobStatus().get();
        }
        if (!status.equals(JobStatus.FINISHED)) {
          throw new IOException("Job reached terminal state with result: "
                  + jobClient.getJobExecutionResult().get().toString()
                  + " but job status is not FINISHED after timeout: " + timeoutMillis);
        }
      } 
      tableResult.await();
      waitForJobStatusFinishedOrThrow(
              tableResult.getJobClient().get(),
              TimeUnit.SECONDS.toMillis(30),
              TimeUnit.SECONDS.toMillis(1)
      );
      context.out.write("Insertion successfully.\n"); 

      Attachments

        Activity

          People

            Unassigned Unassigned
            izeren Aleksandr Iushmanov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: