Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10617

python CombineGlobally().with_fanout() cause duplicate combine results for sliding windows

Details

    Description

      not only there are more than 1 result per window, results for each window got duplicated as well.

      here is some code I made to reproduce the issue, just run it with and without .with_fanout

      if running with Dataflow runner, add appropriate gs://path/ in WriteToText

       

      import apache_beam as beam
      from apache_beam.transforms import window
      from apache_beam.utils.timestamp import Timestamp
      
      class ListFn(beam.CombineFn):
        def create_accumulator(self):
          return []
      
        def add_input(self, mutable_accumulator, element):
          return mutable_accumulator + [element]
      
        def merge_accumulators(self, accumulators):
          res = []
          for accu in accumulators:
            res = res + accu
          return res
      
        def extract_output(self, accumulator):
          return accumulator
      
      
      p = beam.Pipeline()
      
      (
          p
          | beam.Create([
            window.TimestampedValue(1, Timestamp(seconds=1596216396)),
            window.TimestampedValue(2, Timestamp(seconds=1596216397)),
            window.TimestampedValue(3, Timestamp(seconds=1596216398)),
            window.TimestampedValue(4, Timestamp(seconds=1596216399)),
            window.TimestampedValue(5, Timestamp(seconds=1596216400)),
            window.TimestampedValue(6, Timestamp(seconds=1596216402)),
            window.TimestampedValue(7, Timestamp(seconds=1596216403)),
            window.TimestampedValue(8, Timestamp(seconds=1596216405))])
          | beam.WindowInto(window.SlidingWindows(10, 5))
          | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
          | beam.Map(repr)
          | beam.io.WriteToText("py-test-result", file_name_suffix='.json', num_shards=1))
      
      p.run()
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            leiyiz Leiyi Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: