Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3414

AfterProcessingTime trigger issue with Flink Runner

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Duplicate
    • 2.2.0
    • Not applicable
    • runner-core, runner-flink
    • None
    • idea, ubuntu 16.04, FlinkRunner

    Description

      in my demo, I read data from kafka and count globally, finally output the total count of recieved data, as follow:

              FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                      .as(FlinkPipelineOptions.class);
      
              options.setStreaming(true);
              options.setRunner(FlinkRunner.class);
              Pipeline pipeline = Pipeline.create(options);
              pipeline
                      .apply("Read from kafka",
                              KafkaIO.<String, String>read()
      //                                .withTimestampFn(kafkaData -> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
                                      .withBootstrapServers("localhost:9092")
                                      .withTopic("recharge")
                                      .withKeyDeserializer(StringDeserializer.class)
                                      .withValueDeserializer(StringDeserializer.class)
                                      .withoutMetadata()
                      )
                      .apply(Values.create())
                      .apply(Window.<String>into(new GlobalWindows())
                                      .triggering(Repeatedly.forever(
                                              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                                      .accumulatingFiredPanes()
                      )
                      .apply(Count.globally())
                      .apply("output",
                              ParDo.of(new DoFn<Long, Void>() {
                                  @ProcessElement
                                  public void process(ProcessContext context) {
                                      System.out.println("---get at: " + Instant.now() + "------");
                                      System.out.println(context.element());
                                  }
                              }));
      

      the result should be displayed after (5s) I sent first data, but sometimes there were nothing display after I sent data. the pic shows the outputs i got in a test:

      (cant upload a pic, desc as text)

      Send 681Msg at: 2018-01-05T06:34:31.436
      
      	---get at: 2018-01-05T06:34:36.668Z------
      	681
      
      Send 681Msg at: 2018-01-05T06:34:47.166
      
      	---get at: 2018-01-05T06:34:52.284Z------
      	1362
      
      Send 681Msg at: 2018-01-05T06:34:55.505
      
      Send 681Msg at: 2018-01-05T06:35:22.068
      
      	---get at: 2018-01-05T06:35:22.112Z------
      	2044
      

      btw, the code works fine with direct runner.

      Attachments

        Issue Links

          Activity

            People

              dwysakowicz Dawid Wysakowicz
              huangjianhuang huangjianhuang
              Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: