Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Duplicate
-
2.24.0
-
1. MacOS BigSur 11.4, Python 3.9
2. Google DataFlow, Python 3.9
Description
Background
Even though GroupByKey and GroupBy work for simple key fields, it doesn't work for composite keys where the key is another dictionary.
Example:
import apache_beam as beam def run(consumer_args, pipeline_args): with beam.Pipeline() as pipeline: pipeline | 'Create Dataset' >> beam.Create([ ({'key': {'image_id': 1, 'context_id': 2}}, 1), ({'key': {'image_id': 2, 'context_id': 2}}, 2), ({'key': {'image_id': 3, 'context_id': 2}}, 3), ({'key': {'image_id': 4, 'context_id': 2}}, 4), ({'key': {'image_id': 5, 'context_id': 2}}, 5), ({'key': {'context_id': 2, 'image_id': 1}}, 6), ({'key': {'context_id': 2, 'image_id': 2}}, 7), ({'key': {'context_id': 2, 'image_id': 3}}, 8), ({'key': {'context_id': 2, 'image_id': 4}}, 9), ({'key': {'context_id': 2, 'image_id': 5}}, 10), ]) | 'Group By Key' >> beam.GroupByKey() \ | 'Print Stuff' >> beam.Map(print) pipeline.run()
Note there are records with the same keys (semantically) but in a different order.
The same issue occurs for GroupBy when we use a mapper function.
| 'Group By Key' >> beam.GroupByKey(lambda record: record[0]['key'])
Expected output:
({'key': {'image_id': 1, 'context_id': 2}}, [1, 6]) ({'key': {'image_id': 2, 'context_id': 2}}, [2, 7]) ({'key': {'image_id': 3, 'context_id': 2}}, [3, 8]) ({'key': {'image_id': 4, 'context_id': 2}}, [4, 9]) ({'key': {'image_id': 5, 'context_id': 2}}, [5, 10])
Actual output:
({'key': {'image_id': 1, 'context_id': 2}}, [1]) ({'key': {'image_id': 2, 'context_id': 2}}, [2]) ({'key': {'image_id': 3, 'context_id': 2}}, [3]) ({'key': {'image_id': 4, 'context_id': 2}}, [4]) ({'key': {'image_id': 5, 'context_id': 2}}, [5]) ({'key': {'context_id': 2, 'image_id': 1}}, [6]) ({'key': {'context_id': 2, 'image_id': 2}}, [7]) ({'key': {'context_id': 2, 'image_id': 3}}, [8]) ({'key': {'context_id': 2, 'image_id': 4}}, [9]) ({'key': {'context_id': 2, 'image_id': 5}}, [10])
Workaround:
def _generate_key(key_object): # Generate an Ordered String Key only for the GroupBy fields = list(key_object.keys()) fields.sort() return ",".join([f"{field}:{key_object[field]}" for field in fields]) ... ...| "Group" >> beam.GroupBy(lambda record: _generate_key(record['key'])) ...
Attachments
Issue Links
- duplicates
-
BEAM-11719 Enforce deterministic coding for GroupByKey and Stateful DoFns
- Triage Needed