Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.11.0
-
None
-
None
-
Dataflow
Description
My working environment:
- Apache Beam Java SDK version: works with 2.9.0 but failed with 2.11.0
- Runner: failed with both Direct Runner and Dataflow Runner
- Application code: Scala (note I did not use Scio)
I tried to change Apache Beam Java SDK version from 2.9.0 to 2.11.0 and deploy it to Dataflow but I got this error. It works with 2.9.0.
Exception in thread "main" java.lang.IncompatibleClassChangeError: Class org.apache.beam.model.pipeline.v1.RunnerApi$StandardPTransforms$Primitives does not implement the requested interface org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ProtocolMessageEnum at org.apache.beam.runners.core.construction.BeamUrns.getUrn(BeamUrns.java:27) at org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:61) at org.apache.beam.runners.core.construction.UnconsumedReads$1.visitValue(UnconsumedReads.java:48) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:674) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) at org.apache.beam.runners.core.construction.UnconsumedReads.ensureAllReadsConsumed(UnconsumedReads.java:39) at org.apache.beam.runners.dataflow.DataflowRunner.replaceTransforms(DataflowRunner.java:979) at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:707) at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:179) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
my code is in Scala but it works with Beam 2.9.0.
val p = Pipeline.create(options) val bqDestTable = s"$projectId:$dataset.${table}_${bu.name}" val topicName = s"${options.getKafkaTopic}_${bu.name}" p.apply(s"${bu.name}_ReadFromKafka", KafkaIO.read() .withBootstrapServers(options.getBootstreapServers) .updateConsumerProperties(config) .withTopics(util.Arrays.asList(topicName)) .withKeyDeserializer(classOf[LongDeserializer]) .withValueDeserializer(classOf[StringDeserializer]) .withConsumerFactoryFn( new KafkaTLSConsumerFactory( projectId, options.getSourceBucket, options.getTrustStoreGCSKey, options.getKeyStoreGCSKey))) .apply(s"${bu.name}_Convert", ParDo.of(new ConvertJSONTextToEPCTransaction(bu))) .apply(s"${bu.name}_WriteToBQ", BigQueryIO.write() .to(bqDestTable) .withSchema(schema) .withFormatFunction(new ConvertMessageToTable()) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)) p.run
The error comes with this part.
package org.apache.beam.runners.core.construction; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ProtocolMessageEnum; /** Returns the standard URN of a given enum annotated with [(standard_urn)]. */ public class BeamUrns { /** Returns the standard URN of a given enum annotated with [(standard_urn)]. */ public static String getUrn(ProtocolMessageEnum value) { return value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn); } }