Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12552

GroupBy and GroupByKey doesn't group composite keys

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Duplicate
    • 2.24.0
    • Not applicable
    • beam-model, sdk-py-core
    • 1. MacOS BigSur 11.4, Python 3.9
      2. Google DataFlow, Python 3.9

    Description

      Background

      Even though GroupByKey and GroupBy work for simple key fields, it doesn't work for composite keys where the key is another dictionary.

       

      Example:

      import apache_beam as beam
      
      
      def run(consumer_args, pipeline_args):
      
          with beam.Pipeline() as pipeline:
              pipeline | 'Create Dataset' >> beam.Create([
                  ({'key': {'image_id': 1, 'context_id': 2}}, 1),
                  ({'key': {'image_id': 2, 'context_id': 2}}, 2),
                  ({'key': {'image_id': 3, 'context_id': 2}}, 3),
                  ({'key': {'image_id': 4, 'context_id': 2}}, 4),
                  ({'key': {'image_id': 5, 'context_id': 2}}, 5),
                  ({'key': {'context_id': 2, 'image_id': 1}}, 6),
                  ({'key': {'context_id': 2, 'image_id': 2}}, 7),
                  ({'key': {'context_id': 2, 'image_id': 3}}, 8),
                  ({'key': {'context_id': 2, 'image_id': 4}}, 9),
                  ({'key': {'context_id': 2, 'image_id': 5}}, 10),
              ]) | 'Group By Key' >> beam.GroupByKey() \
                 | 'Print Stuff' >> beam.Map(print)
      
              pipeline.run()
      
      

      Note there are records with the same keys (semantically) but in a different order.

      The same issue occurs for GroupBy when we use a mapper function.

      | 'Group By Key' >> beam.GroupByKey(lambda record: record[0]['key'])
      

       

      Expected output:

      ({'key': {'image_id': 1, 'context_id': 2}}, [1, 6])
      ({'key': {'image_id': 2, 'context_id': 2}}, [2, 7])
      ({'key': {'image_id': 3, 'context_id': 2}}, [3, 8])
      ({'key': {'image_id': 4, 'context_id': 2}}, [4, 9])
      ({'key': {'image_id': 5, 'context_id': 2}}, [5, 10])

       

      Actual output:

      ({'key': {'image_id': 1, 'context_id': 2}}, [1])
      ({'key': {'image_id': 2, 'context_id': 2}}, [2])
      ({'key': {'image_id': 3, 'context_id': 2}}, [3])
      ({'key': {'image_id': 4, 'context_id': 2}}, [4])
      ({'key': {'image_id': 5, 'context_id': 2}}, [5])
      ({'key': {'context_id': 2, 'image_id': 1}}, [6])
      ({'key': {'context_id': 2, 'image_id': 2}}, [7])
      ({'key': {'context_id': 2, 'image_id': 3}}, [8])
      ({'key': {'context_id': 2, 'image_id': 4}}, [9])
      ({'key': {'context_id': 2, 'image_id': 5}}, [10])
      

      Workaround:

      def _generate_key(key_object):
          # Generate an Ordered String Key only for the GroupBy
          fields = list(key_object.keys())
          fields.sort()
          return ",".join([f"{field}:{key_object[field]}" for field in fields])
      
      ...
      
      ...| "Group" >> beam.GroupBy(lambda record: _generate_key(record['key']))
      ...
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              gnomezgrave Praneeth Peiris
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: