Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7678

typehints with_output_types annotation doesn't work for stateful DoFn

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • 2.13.0
    • 2.16.0
    • sdk-py-core
    • None

    Description

      The output types typehints seem to be ignored when using a stateful DoFn, but the same typehint works perfectly when used without state. This issue prevents a custom Coder from being used because Beam will default to one of theĀ FastCoders (I believe Pickle).

      Example code:

      @typehints.with_output_types(Message)
      class StatefulDoFn(DoFn):
      
          COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
          def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
            (key, messages) = element
            newMessage = Message()
            return [newMessage]
      

      The example code is just defining a stateful DoFn for python. The used runner is the Flink 1.6.4 portable runner.

      Finally, overriding infer_output_type to return a typehints.List[Message] solves the issue.

      Looking at the code, it seems to me that in https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643 we do not take the typehints into account.

      Attachments

        Issue Links

          Activity

            People

              enricoc Enrico Canzonieri
              enricoc Enrico Canzonieri
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 20m
                  2h 20m

                  Slack

                    Issue deployment