Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8782

Python typehints: with_output_types breaks multi-output dofns

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • sdk-py-core

    Description

        def test_typed_multi_pardo(self):
          p = TestPipeline()
          res = (p
                 | beam.Create([1, 2, 3])
                 | beam.Map(lambda e: e).with_outputs().with_output_types(int))
          self.assertIsNotNone(res[None].element_type)
          res_main = (res[None]
                      | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
          assert_that(res_main, equal_to([1, 2, 3]), label='none_check')
          p.run()
      

      Fails with:

      typed_pipeline_test.py:212: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      ../pvalue.py:113: in __or__
          return self.pipeline.apply(ptransform, self)
      ../pipeline.py:528: in apply
          transform.type_check_outputs(pvalueish_result)
      ../transforms/ptransform.py:386: in type_check_outputs
          self.type_check_inputs_or_outputs(pvalueish, 'output')
      ../transforms/ptransform.py:401: in type_check_inputs_or_outputs
          if pvalue_.element_type is None:
      ../pvalue.py:241: in __getattr__
          return self[tag]
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = <DoOutputsTuple main_tag=None tags=() transform=<ParDo(PTransform) label=[Map(<lambda at typed_pipeline_test.py:212>)]> at 0x7fa9513f3048>
      tag = 'element_type'
      
          def __getitem__(self, tag):
            # Accept int tags so that we can look at Partition tags with the
            # same ints that we used in the partition function.
            # TODO(gildea): Consider requiring string-based tags everywhere.
            # This will require a partition function that does not return ints.
            if isinstance(tag, int):
              tag = str(tag)
            if tag == self._main_tag:
              tag = None
            elif self._tags and tag not in self._tags:
              raise ValueError(
                  "Tag '%s' is neither the main tag '%s' "
                  "nor any of the tags %s" % (
                      tag, self._main_tag, self._tags))
            # Check if we accessed this tag before.
            if tag in self._pcolls:
              return self._pcolls[tag]
          
            if tag is not None:
              self._transform.output_tags.add(tag)
              pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)
              # Transfer the producer from the DoOutputsTuple to the resulting
              # PCollection.
      >       pcoll.producer = self.producer.parts[0]
      E       AttributeError: 'NoneType' object has no attribute 'parts'
      
      ../pvalue.py:266: AttributeError
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            udim Udi Meiri
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: