Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
2.1.0, 2.2.0, 2.3.0
-
None
Description
Issue
Beam AfterProcessingTime trigger doesn't fire always reliably after a configured delay.
The following job triggers should fire after watermark passes the end of the window and then every 5 seconds for late data and the finally at the end of allowed lateness.
Expected behaviour
Late firing after processing time trigger should fire after 5 seconds since first late records arrive in the pane.
Actual behaviour
From my testings late triggers works for some keys but not for the other - it's pretty random which keys are affected. The DummySource generates 15 distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one late record. In case late trigger firing is missed it won't fire until the allowed lateness period.
Job code
String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); Pipeline pipeline = Pipeline.create(options); PCollection<String> apply = pipeline.apply(Read.from(new DummySource())) .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings( AfterProcessingTime .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) .accumulatingFiredPanes() .withAllowedLateness(Duration.standardMinutes(2), Window.ClosingBehavior.FIRE_IF_NON_EMPTY) ); apply.apply(Count.perElement()) .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() { @ProcessElement public void process(ProcessContext context, BoundedWindow window) { LOG.info("Count: {}. For window {}, Pane {}", context.element(), window, context.pane()); } })); pipeline.run().waitUntilFinish();
How can you replicate the issue?
I've created a github repo https://github.com/pbartoszek/BEAM-3863_late_trigger with the code shown above. Please check out the README file for details how to replicate the issue.
What's is causing the issue?
I explained the cause in PR.