Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7452

Ensure all RunnerHarnesses provide a valid RunnerApi.IsBounded value on all PCollections

Details

    • New Feature
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • runner-core

    Description

      Fixing this requires updating 4 locations.

      • Dataflow RunnerHarness
      • FNAPDoFnRunner
      • UnifiedWorker
      • Shared libraries for this proto generation, which should cover OSS runners
        + Remove the workaround in ProcessBundleHandler.java which will assume that all PCollections are bounded, if not set.

      See PCollectionTranslation.fromProto which should be always passed a valid value and not default to error or assume the PCollection is bounded.

       

      Context

      ===

      When I was updating the java SDK to conditionally serialize some elements to reported a sampled byte size metric, I encountered this.

       
      Its due to to the refactoring in my PR/8416, the RehydratedComponents was pulled up a level, and shared now among all the calls to createRunnerForPTransform in the various PtransfomRunnerFactories.
       
      I is now triggering some code paths which were not previously triggered for all types of PTransforms/PCollections, causing this error to occur.
       
       jsonPayload:

      {   exception:  "org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Cannot convert unknown org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053) at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057) at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986) at org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:144) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry.getMultiplexingConsumer(PCollectionConsumerRegistry.java:145) at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory$Context.<init>(DoFnPTransformRunnerFactory.java:284) at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:97) at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:63) at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:198) at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166) at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166) at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:306) at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160) at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Cannot convert unknown org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED at org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:88) at org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:56) at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:103) at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:93) at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) ... 17 more "     job:  "2019-05-29_03_31_14-4799355109250203557"     logger:  "org.apache.beam.fn.harness.control.BeamFnControlClient"     *message:  "Exception while trying to handle InstructionRequest -28"*     portability_worker_id:  "1"     thread:  "16"     worker:  "testpipeline-pabloem-0529-05290331-75o8-harness-htz8"    }

       
      The root of the issue is that the ProcessBundleDescriptors are invalid. The RunnerHarnesses are not setting the org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded which breaks the specification and leads to this error.
       
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            ajamato@google.com Alex Amato
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: