Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • beam-model
    • None

    Description

      Hi all,

      I am having a potential issue with Windowing on cloud PubsubIO.

      I am finding that FixedWindows do not trigger on either DirectRunner or DataflowRunner after running a GroupBy transform.

      A basic pipeline with my use case would look like:

       

      Pipeline p = Pipeline.create();
      
      PubsubIO.Read<PubsubMessage> read = PubsubIO
          .readMessagesWithAttributes()
          .withTimestampAttribute("time")
          .fromTopic("test-topic");
      
      Window<String> window = Window.<String>into(
          FixedWindows.of(Duration.standardSeconds(10L))
      )
          .triggering(AfterWatermark.pastEndOfWindow())
          .withAllowedLateness(Duration.standardSeconds(10L))
          .discardingFiredPanes();
      
      PCollection<KV<String, Iterable<PubsubMessage>>> windowedMessages = p
          .apply("Read Events", read)
          .apply("Apply Window", window)
          .apply("Convert to KV", ParDo.of(new ConvertToMapOnKey()))
          .apply("Group by key", GroupByKey.<String, PubsubMessage>create())
          .apply("Log Pairs", ParDo.of(new LogGroupedEvents()));

       

      LogGroupedEvents would log the key as a string, and the array of PubsubMessages in the grouped array. But this function never runs correctly.

      For simplicity I have simplified the pipeline to demonstrate the issue and have removed the actual use case of the pipeline. Therefore it may seem odd that I am grouping and logging simple messages but that is actually not what I am doing.

       

      If I swap the windowing function for one with triggers it works correctly.

      Window<String> getDefaultWindow(Long duration) {
          return Window.<String>into(new GlobalWindows())
              .triggering(Repeatedly.forever(
                  AfterProcessingTime
                      .pastFirstElementInPane()
                      .plusDelayOf(Duration.standardSeconds(duration)
                  )
              ))
              .withAllowedLateness(Duration.standardSeconds(10L))
              .discardingFiredPanes()
          ;
      }
      

       

      This could be due to me not understanding windowing and triggers but according the documentation and many examples online all that people use is a simple FixedWindow because it needs to automatically run a trigger at the end of the window per the beam docs:

       

      https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function

      On example 7.3.1.

       

      I have been researching as much as I can about how windowing works internally. We arrived to our solution with triggering by looking at source code.

       

      Let me know if there is any other information you need from me to help look into this.

      Attachments

        Activity

          People

            Unassigned Unassigned
            gregorskii Gregory Parsons
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: