Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-5441

Portable Wordcount fails in GreedyPipelineFuser

Details

    Description

      The Python SDK wordcount with the PortableRunner throws the following exception:

      java.lang.IllegalArgumentException: A PCollectionNode should have exactly one producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, PCollection=unique_name: "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
      coder_id: "ref_Coder_FastPrimitivesCoder_2"
      is_bounded: BOUNDED
      windowing_strategy_id: "ref_Windowing_Windowing_1"
      } has [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32, transform=spec {
        urn: "beam:transform:generic_composite:v1"
        payload: "<Reshuffle(PTransform) label=[Reshuffle]>"
      }
      subtransforms: "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
      subtransforms: "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
      subtransforms: "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
      inputs {
        key: "0"
        value: "ref_PCollection_PCollection_19"
      }
      outputs {
        key: "None"
        value: "ref_PCollection_PCollection_26"
      }
      unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
      }, PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41, transform=spec {
        urn: "urn:beam:transform:pardo:v1"
        payload: "\n\317\006\n\255\006\n beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
      }
      inputs {
        key: "0"
        value: "ref_PCollection_PCollection_25"
      }
      outputs {
        key: "None"
        value: "ref_PCollection_PCollection_26"
      }
      unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys"
      }]
              at org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:416)
              at org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:176)
              at org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:119)
              at org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:82)
              at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:67)
              at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:89)
              at org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:96)
              at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
              at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
              at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      

      Looks like it was caused by https://github.com/apache/beam/pull/6328

      Attachments

        Issue Links

          Activity

            People

              rdub Ryan Williams
              mxm Maximilian Michels
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h
                  3h