Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3225

Non deterministic behaviour of AfterProcessingTime trigger with multiple group by transformations

Details

    Description

      Context

      I run my test code against different triggers and runners. My original problem was that when writing to a file sink files weren't always produced in a deterministic way. Please refer to this BEAM-3151 When I started looking at WriteFiles class I noticed that file sink implementation includes some multiple GroupByKey transformations. Knowing that I wrote my test code that is using multiple GroupByKey transformations to conclude that this is a bit buggy support of After(Synchronised)ProcessingTime triggers by GroupByKey that also influence the file sink. When I run my job using Dataflow runner I was getting expected output.

      About test code

      The job is counting how many A and B elements it received within 30 sec windows using Count.perElement. Then I am using GroupByKey to fire every time count has increased.

      Below I outlined the expected standard output: 

      Let's assume all events are received in the same 30 seconds window.
      
      A -> After count KV{A, 1} -> Final group by KV{A, [1]}
      A -> After count KV{A, 2} -> Final group by KV{A, [1,2]}
      A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]}
      B -> After count KV{B, 1} -> Final group by KV{B, [1]}
      
      With my trigger configuration I would expect that for every new element 'After count' is printed with new value followed by 'Final group by' with new counter. Final group by represents the history of counters then.

       

      Problem

      'Final group by' trigger doesn't always go off although trigger set up would suggest that. This behaviour is different for different runners and Beam versions. 

       

      My observations when using Pubsub

      Trigger configuration

      Window.<String>into(FixedWindows.of(standardSeconds(30)))
                              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))              
                              .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
                              .accumulatingFiredPanes())
      
      

       
      Beam 2.0 Flink Runner

      2017-11-16T14:51:44.294Z After count KV{A, 1} [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
      2017-11-16T14:51:53.036Z Received Element A [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
      2017-11-16T14:51:53.143Z After count KV{A, 2} [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
      2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
      2017-11-16T14:52:03.522Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:03.629Z After count KV{A, 1} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:03.732Z Final group by KV{A, [1]} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:07.270Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:07.372Z After count KV{A, 2} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:10.394Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:10.501Z After count KV{A, 3} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:13.296Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:13.402Z After count KV{A, 4} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:13.507Z Final group by KV{A, [1, 2, 3, 4]} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
      2017-11-16T14:52:14.951Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) <--- Expected to see 'After count' after this
      2017-11-16T14:52:35.320Z Received Element A [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
      2017-11-16T14:52:35.426Z After count KV{A, 1} [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
      2017-11-16T14:52:35.532Z Final group by KV{A, [1]} [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
      

      Beam 2.0 Direct Runner

      2017-11-16T14:49:34.135Z Received Element A [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z)
      2017-11-16T14:49:34.324Z After count KV{A, 1} [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T14:49:37.526Z Received Element A [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
      2017-11-16T14:49:37.535Z After count KV{A, 1} [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T14:49:44.287Z Received Element A [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
      2017-11-16T14:49:44.294Z After count KV{A, 2} [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
      2017-11-16T14:49:47.991Z Received Element A [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T14:49:47.995Z After count KV{A, 3} [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T14:50:03.323Z Received Element A [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z)
      2017-11-16T14:50:03.328Z After count KV{A, 1} [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T14:51:14.309Z Received Element A [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z)
      2017-11-16T14:51:14.315Z After count KV{A, 1} [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z) <--- Expected to see 'Final group by' after this
      

       Beam 2.1 Flink Runner

      2017-11-16T15:13:02.747Z After count KV{A, 1} [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z)
      2017-11-16T15:13:02.761Z Final group by KV{A, [1]} [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z)
      2017-11-16T15:13:09.492Z Received Element A [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
      2017-11-16T15:13:09.501Z After count KV{A, 1} [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
      2017-11-16T15:13:09.608Z Final group by KV{A, [1]} [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
      2017-11-16T15:13:13.029Z Received Element A [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
      2017-11-16T15:13:13.134Z After count KV{A, 2} [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
      2017-11-16T15:13:13.240Z Final group by KV{A, [1, 2]} [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
      2017-11-16T15:13:15.420Z Received Element A [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) <--- Expected to see 'After count' after this
      2017-11-16T15:13:38.285Z Received Element A [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
      2017-11-16T15:13:38.379Z After count KV{A, 1} [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
      2017-11-16T15:13:38.481Z Final group by KV{A, [1]} [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
      

      Beam 2.1 Direct Runner

      2017-11-16T15:17:38.485Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:38.595Z After count KV{A, 1} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:38.608Z Final group by KV{A, [1]} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:44.977Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:44.985Z After count KV{A, 2} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:44.988Z Final group by KV{A, [1, 2]} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:51.126Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) <--- Expected to see 'After count' after this
      2017-11-16T15:17:57.154Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:57.154Z After count KV{A, 3} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:57.154Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:57.158Z After count KV{A, 4} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:57.161Z After count KV{A, 5} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      2017-11-16T15:17:57.163Z Final group by KV{A, [1, 2, 3, 4, 5]} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
      
      

      Trigger configuration

      Window.<String>into(FixedWindows.of(standardSeconds(30)))
                              .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
      )              
                              .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
                              .accumulatingFiredPanes())
      
      

      Beam 2.0 Flink Runner

      2017-11-16T14:54:14.017Z Received Element A [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
      2017-11-16T14:54:14.187Z After count KV{A, 1} [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
      2017-11-16T14:54:14.205Z Final group by KV{A, [1]} [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
      2017-11-16T14:54:38.499Z Received Element A [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
      2017-11-16T14:54:38.604Z After count KV{A, 1} [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
      2017-11-16T14:54:38.709Z Final group by KV{A, [1]} [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
      2017-11-16T14:54:42.665Z Received Element A [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
      2017-11-16T14:54:42.770Z After count KV{A, 2} [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) <--- Expected to see 'Final group by' after this (Although I can see the output for next minute already the final group by trigger is lost for this 30 s windows possibly forever?
      
      2017-11-16T14:55:06.131Z Received Element A [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
      2017-11-16T14:55:06.237Z After count KV{A, 1} [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
      2017-11-16T14:55:06.342Z Final group by KV{A, [1]} [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
      

      Beam 2.1 Flink Runner

      2017-11-16T15:11:09.666Z Received Element A [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
      2017-11-16T15:11:09.838Z After count KV{A, 1} [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
      2017-11-16T15:11:09.853Z Final group by KV{A, [1]} [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
      2017-11-16T15:11:14.208Z Received Element A [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T15:11:33.216Z Received Element A [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
      2017-11-16T15:11:33.322Z After count KV{A, 1} [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
      2017-11-16T15:11:33.327Z Final group by KV{A, [1]} [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
      2017-11-16T15:11:54.740Z Received Element A [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
      2017-11-16T15:11:54.843Z After count KV{A, 2} [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
      2017-11-16T15:11:54.947Z Final group by KV{A, [1, 2]} [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
      

      My observations when using Kinesis Stream

      Trigger configuration

      Window.<String>into(FixedWindows.of(standardSeconds(30)))
                              .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))              
                              .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
                              .accumulatingFiredPanes())
      
      

      Beam 2.1 Direct Runner

      2017-11-16T10:56:33.241Z A
      2017-11-16T10:56:33.460Z After count KV{A, 1} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
      2017-11-16T10:56:33.475Z Final group by KV{A, [1]} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
      2017-11-16T10:56:37.916Z B
      2017-11-16T10:56:37.950Z After count KV{B, 1} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
      2017-11-16T10:56:37.956Z Final group by KV{B, [1]} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
      2017-11-16T10:57:05.388Z After count KV{A, 1} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
      2017-11-16T10:57:05.388Z After count KV{B, 1} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
      2017-11-16T10:57:05.392Z Final group by KV{B, [1, 1]} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
      2017-11-16T10:57:05.392Z Final group by KV{A, [1, 1]} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
      

      Beam 2.0 Direct Runner

      2017-11-16T10:55:11.851Z A
      2017-11-16T10:55:11.854Z After count KV{A, 1} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) --
      2017-11-16T10:55:18.329Z B
      2017-11-16T10:55:18.333Z After count KV{B, 1} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
      2017-11-16T10:55:35.195Z After count KV{A, 1} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
      2017-11-16T10:55:35.196Z After count KV{B, 1} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
      2017-11-16T10:55:35.199Z Final group by KV{A, [1, 1]} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
      2017-11-16T10:55:35.199Z Final group by KV{B, [1, 1]} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
      

      Beam 2.0 Flink Runner

      2017-11-16T11:00:04.820Z A
      2017-11-16T11:00:04.838Z After count KV{A, 1} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
      2017-11-16T11:00:04.943Z Final group by KV{A, [1]} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
      2017-11-16T11:00:10.105Z B
      2017-11-16T11:00:10.138Z After count KV{B, 1} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
      2017-11-16T11:00:30.188Z Final group by KV{B, [1]} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
      2017-11-16T11:00:35.190Z After count KV{A, 1} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
      2017-11-16T11:00:35.191Z After count KV{B, 1} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
      2017-11-16T11:00:35.295Z Final group by KV{A, [1, 1]} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <--- Why Final group by is triggered only after allowed lateness at 11:00:35?
      
      2017-11-16T11:00:35.297Z Final group by KV{B, [1, 1]} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
      2017-11-16T11:00:35.298Z Final group by KV{A, []} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <-- allowed lateness configuration dictates that only non empty panes should be trigger!!!
      
      2017-11-16T11:00:35.298Z Final group by KV{B, []} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
      

      Trigger configuration

      Window.<String>into(FixedWindows.of(standardSeconds(30)))
                              .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
      )              
                              .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
                              .accumulatingFiredPanes())
      
      

      Beam 2.1 Direct Runner

      2017-11-16T11:14:36.754Z A
      2017-11-16T11:14:36.912Z After count KV{A, 1} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
      2017-11-16T11:14:36.925Z Final group by KV{A, [1]} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
      2017-11-16T11:14:45.431Z B
      2017-11-16T11:14:45.437Z After count KV{B, 1} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
      2017-11-16T11:14:45.440Z Final group by KV{B, [1]} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
      2017-11-16T11:15:00.034Z After count KV{A, 1} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
      2017-11-16T11:15:00.035Z After count KV{B, 1} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
      2017-11-16T11:15:00.039Z Final group by KV{A, [1, 1]} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
      2017-11-16T11:15:00.040Z Final group by KV{B, [1, 1]} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
      

      Beam 2.0 Direct Runner

      2017-11-16T11:05:12.562Z A
      2017-11-16T11:05:12.565Z After count KV{A, 1} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
      2017-11-16T11:05:15.326Z B
      2017-11-16T11:05:15.330Z After count KV{B, 1} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
      2017-11-16T11:05:30.456Z After count KV{A, 1} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
      2017-11-16T11:05:30.457Z After count KV{B, 1} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
      2017-11-16T11:05:30.459Z Final group by KV{A, [1, 1]} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
      2017-11-16T11:05:30.459Z Final group by KV{B, [1, 1]} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
      

      Beam 2.0 Flink Runner - run 1

      2017-11-16T11:06:32.634Z A
      2017-11-16T11:06:32.797Z After count KV{A, 1} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:06:32.812Z Final group by KV{A, [1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:06:36.449Z B
      2017-11-16T11:06:36.550Z After count KV{B, 1} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:06:36.654Z Final group by KV{B, [1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:07:00.153Z After count KV{A, 1} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:07:00.155Z After count KV{B, 1} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:07:00.258Z Final group by KV{A, [1, 1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:07:00.260Z Final group by KV{B, [1, 1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:07:00.262Z Final group by KV{A, [1, 1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      2017-11-16T11:07:00.263Z Final group by KV{B, [1, 1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
      

      Beam 2.0 Flink Runner - run 2

      2017-11-16T11:09:41.538Z A
      2017-11-16T11:09:41.618Z After count KV{A, 1} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
      2017-11-16T11:09:41.687Z Final group by KV{A, [1]} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
      2017-11-16T11:09:45.671Z B
      2017-11-16T11:09:45.681Z After count KV{B, 1} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T11:10:00.102Z After count KV{A, 1} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
      2017-11-16T11:10:00.103Z After count KV{B, 1} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
      2017-11-16T11:10:00.201Z Final group by KV{B, [1, 1]} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
      2017-11-16T11:10:00.202Z Final group by KV{A, [1, 1]} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
      2017-11-16T11:10:00.203Z Final group by KV{B, [1, 1]} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
      

      Beam 2.0 Flink Runner - run 3

      2017-11-16T11:08:09.474Z A
      2017-11-16T11:08:09.558Z After count KV{A, 1} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
      2017-11-16T11:08:09.646Z Final group by KV{A, [1]} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
      2017-11-16T11:08:14.541Z B
      2017-11-16T11:08:30.255Z After count KV{A, 1} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T11:08:30.256Z After count KV{B, 1} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
      2017-11-16T11:08:30.307Z Final group by KV{A, [1, 1]} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
      2017-11-16T11:08:30.309Z Final group by KV{A, [1, 1]} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
      2017-11-16T11:08:30.310Z Final group by KV{B, [1]} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
      

      Beam 2.0 Flink Runner - run 4

      2017-11-16T11:07:44.814Z A
      2017-11-16T11:07:44.841Z After count KV{A, 1} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T11:07:48.883Z B
      2017-11-16T11:07:48.901Z After count KV{B, 1} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T11:08:00.099Z After count KV{A, 1} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T11:08:00.101Z After count KV{B, 1} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 'Final group by' after this
      2017-11-16T11:08:00.204Z Final group by KV{A, [1, 1]} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
      2017-11-16T11:08:00.205Z Final group by KV{B, [1, 1]} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
      2017-11-16T11:08:00.206Z Final group by KV{A, [1, 1]} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
      2017-11-16T11:08:00.207Z Final group by KV{B, [1, 1]} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
      

       

       Workaround

      If you check test code I redefined a first trigger just before second group by key transformations and I was started getting expected result on my local machine using both direct runner and flink runner. However when I deployed job to Flink cluster the Final group by trigger didn't go off sometimes.

       

      My intuition
      I guess that there is some bug with handling After(Synchronised)ProcessingTime triggers in Beam. AfterWartermark trigger always works as expected. It's very interesting that AfterProcessingTime triggers are going off at different times when comparing Beam 2.0 and 2.1.
      I am a bit worried that this bug might be still in Beam 2.2 although not occurring that frequently.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pawelbartoszek Pawel Bartoszek
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: