Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7995

IllegalStateException: TimestampCombiner moved element from to earlier time in Python

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • None
    • Missing
    • sdk-py-core
    • None

    Description

      I'm looking into a bug I found internally when using Beam portable API (Python) on our own Samza runner. 
       
      The pipeline looks something like this:
       
          (p
           | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
           | 'transform' >> beam.Map(lambda event: process_event(event))
           | 'window' >> beam.WindowInto(FixedWindows(15))
           | 'group' >> beam.CombinePerKey(beam.combiners.CountCombineFn())
           ...
       
      The problem comes from the combiners which cause the following exception on Java side:
       
      Caused by: java.lang.IllegalStateException: TimestampCombiner moved element from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
          at org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
          at org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
          at org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
          at org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
          at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
       
      The exception happens here https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116 when we check the shifted timestamp to ensure it's before the timestamp.
       
          if (shifted.isBefore(timestamp))

      {       throw new IllegalStateException(           String.format(               "TimestampCombiner moved element from %s to earlier time %s for window %s",               BoundedWindow.formatTimestamp(timestamp),               BoundedWindow.formatTimestamp(shifted),               window));     }

       
      As you can see from the exception, the "shifted" is "XXX 44.999" while the "timestamp" is "XXX 45.000". The "44.999" is coming from TimestampCombiner.END_OF_WINDOW:
       
          @Override
          public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps)

      {       return intoWindow.maxTimestamp();     }

       
      where intoWindow.maxTimestamp() is:
       
        /** Returns the largest timestamp that can be included in this window. */
        @Override
        public Instant maxTimestamp()

      {     *// end not inclusive*     return *end.minus(1)*;   }

       
      Hence, the "44.999". 
       
      And the "45.000" comes from the Python side when the combiner output results as pre GBK operation: operations.py#PGBKCVOperation#output_key
       
          if windows is 0:
            self.output(_globally_windowed_value.with_value((key, value)))
          else:
            self.output(WindowedValue((key, value), windows[0].end, windows))
       
      Here when we generate the window value, the timestamp is assigned to the closed interval end (45.000) as opposed to open interval end (44.999)
       
      Clearly the "end of window" definition is a bit inconsistent across Python and Java. I'm yet to try this on other runner so not sure whether this is only an issue for our Samza runner. I tend to think this is a bug but would like to confirm with you. If this has not been an issue for other runners, where did I potentially do wrong.

      Attachments

        Issue Links

          Activity

            People

              lhaiesp Hai Lu
              lhaiesp Hai Lu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 50m
                  2h 50m