Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
Description
def test_typed_multi_pardo(self): p = TestPipeline() res = (p | beam.Create([1, 2, 3]) | beam.Map(lambda e: e).with_outputs().with_output_types(int)) self.assertIsNotNone(res[None].element_type) res_main = (res[None] | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int)) assert_that(res_main, equal_to([1, 2, 3]), label='none_check') p.run()
Fails with:
typed_pipeline_test.py:212: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ ../pvalue.py:113: in __or__ return self.pipeline.apply(ptransform, self) ../pipeline.py:528: in apply transform.type_check_outputs(pvalueish_result) ../transforms/ptransform.py:386: in type_check_outputs self.type_check_inputs_or_outputs(pvalueish, 'output') ../transforms/ptransform.py:401: in type_check_inputs_or_outputs if pvalue_.element_type is None: ../pvalue.py:241: in __getattr__ return self[tag] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <DoOutputsTuple main_tag=None tags=() transform=<ParDo(PTransform) label=[Map(<lambda at typed_pipeline_test.py:212>)]> at 0x7fa9513f3048> tag = 'element_type' def __getitem__(self, tag): # Accept int tags so that we can look at Partition tags with the # same ints that we used in the partition function. # TODO(gildea): Consider requiring string-based tags everywhere. # This will require a partition function that does not return ints. if isinstance(tag, int): tag = str(tag) if tag == self._main_tag: tag = None elif self._tags and tag not in self._tags: raise ValueError( "Tag '%s' is neither the main tag '%s' " "nor any of the tags %s" % ( tag, self._main_tag, self._tags)) # Check if we accessed this tag before. if tag in self._pcolls: return self._pcolls[tag] if tag is not None: self._transform.output_tags.add(tag) pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any) # Transfer the producer from the DoOutputsTuple to the resulting # PCollection. > pcoll.producer = self.producer.parts[0] E AttributeError: 'NoneType' object has no attribute 'parts' ../pvalue.py:266: AttributeError