Fixing this requires updating 4 locations.
- Dataflow RunnerHarness
- FNAPDoFnRunner
- UnifiedWorker
- Shared libraries for this proto generation, which should cover OSS runners
+ Remove the workaround in ProcessBundleHandler.java which will assume that all PCollections are bounded, if not set.
See PCollectionTranslation.fromProto which should be always passed a valid value and not default to error or assume the PCollection is bounded.
Context
===
When I was updating the java SDK to conditionally serialize some elements to reported a sampled byte size metric, I encountered this.
Its due to to the refactoring in my PR/8416, the RehydratedComponents was pulled up a level, and shared now among all the calls to createRunnerForPTransform in the various PtransfomRunnerFactories.
I is now triggering some code paths which were not previously triggered for all types of PTransforms/PCollections, causing this error to occur.
jsonPayload:
{
exception: "org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Cannot convert unknown org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
at org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:144)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry.getMultiplexingConsumer(PCollectionConsumerRegistry.java:145)
at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory$Context.<init>(DoFnPTransformRunnerFactory.java:284)
at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:97)
at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:63)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:198)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:306)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Cannot convert unknown org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
at org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:88)
at org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:56)
at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:103)
at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:93)
at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
... 17 more
"
job: "2019-05-29_03_31_14-4799355109250203557"
logger: "org.apache.beam.fn.harness.control.BeamFnControlClient"
*message: "Exception while trying to handle InstructionRequest -28"*
portability_worker_id: "1"
thread: "16"
worker: "testpipeline-pabloem-0529-05290331-75o8-harness-htz8"
}
The root of the issue is that the ProcessBundleDescriptors are invalid. The RunnerHarnesses are not setting the org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded which breaks the specification and leads to this error.