Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Fixed
-
2.13.0
-
None
Description
The output types typehints seem to be ignored when using a stateful DoFn, but the same typehint works perfectly when used without state. This issue prevents a custom Coder from being used because Beam will default to one of theĀ FastCoders (I believe Pickle).
Example code:
@typehints.with_output_types(Message) class StatefulDoFn(DoFn): COUNTER_STATE = BagStateSpec('counter', VarIntCoder()) def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)): (key, messages) = element newMessage = Message() return [newMessage]
The example code is just defining a stateful DoFn for python. The used runner is the Flink 1.6.4 portable runner.
Finally, overriding infer_output_type to return a typehints.List[Message] solves the issue.
Looking at the code, it seems to me that in https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643 we do not take the typehints into account.
Attachments
Issue Links
- links to