Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
0.11.0
-
None
-
None
-
Patch
Description
There is an existing race condition in Flink-1.15 and above that breaks code below:
TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try { tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else { throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) { throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) { throw new IOException("Flink job is failed", e); }
jobClient.getJobStatus() can return status RUNNIG for some short period of time when job result already available which leads to IOException thrown from zeppelin interpreter:
Job is failed, Program execution finished
Example of working patch:
private void waitForJobStatusFinishedOrThrow( final JobClient jobClient, final long timeoutMillis, final long retryIntervalMillis) throws IOException, InterruptedException, ExecutionException { final long startTime = System.currentTimeMillis(); JobStatus status = jobClient.getJobStatus().get(); while (status.equals(JobStatus.RUNNING) && System.currentTimeMillis() - startTime < timeoutMillis) { Thread.sleep(retryIntervalMillis); status = jobClient.getJobStatus().get(); } if (!status.equals(JobStatus.FINISHED)) { throw new IOException("Job reached terminal state with result: " + jobClient.getJobExecutionResult().get().toString() + " but job status is not FINISHED after timeout: " + timeoutMillis); } }
tableResult.await();
waitForJobStatusFinishedOrThrow(
tableResult.getJobClient().get(),
TimeUnit.SECONDS.toMillis(30),
TimeUnit.SECONDS.toMillis(1)
);
context.out.write("Insertion successfully.\n");