Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6885

java.lang.IncompatibleClassChangeError when deploying Beam Java SDK 2.11.0 to Dataflow

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.11.0
    • None
    • beam-model
    • None
    • Dataflow

    Description

      My working environment:

      • Apache Beam Java SDK version: works with 2.9.0 but failed with 2.11.0
      • Runner: failed with both Direct Runner and Dataflow Runner
      • Application code: Scala (note I did not use Scio)

      I tried to change Apache Beam Java SDK version from 2.9.0 to 2.11.0 and deploy it to Dataflow but I got this error. It works with 2.9.0. 

      Exception in thread "main" java.lang.IncompatibleClassChangeError: Class org.apache.beam.model.pipeline.v1.RunnerApi$StandardPTransforms$Primitives does not implement the requested interface org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ProtocolMessageEnum
              at org.apache.beam.runners.core.construction.BeamUrns.getUrn(BeamUrns.java:27)
              at org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:61)
              at org.apache.beam.runners.core.construction.UnconsumedReads$1.visitValue(UnconsumedReads.java:48)
              at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:674)
              at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
              at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
              at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
              at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
              at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
              at org.apache.beam.runners.core.construction.UnconsumedReads.ensureAllReadsConsumed(UnconsumedReads.java:39)
              at org.apache.beam.runners.dataflow.DataflowRunner.replaceTransforms(DataflowRunner.java:979)
              at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:707)
              at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:179)
              at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
              at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
      

      my code is in Scala but it works with Beam 2.9.0.

          val p = Pipeline.create(options)
            val bqDestTable = s"$projectId:$dataset.${table}_${bu.name}"
            val topicName = s"${options.getKafkaTopic}_${bu.name}"
            p.apply(s"${bu.name}_ReadFromKafka", KafkaIO.read()
            .withBootstrapServers(options.getBootstreapServers)
            .updateConsumerProperties(config)
            .withTopics(util.Arrays.asList(topicName))
            .withKeyDeserializer(classOf[LongDeserializer])
            .withValueDeserializer(classOf[StringDeserializer])
            .withConsumerFactoryFn(
              new KafkaTLSConsumerFactory(
                projectId, options.getSourceBucket, options.getTrustStoreGCSKey, options.getKeyStoreGCSKey)))
             .apply(s"${bu.name}_Convert", ParDo.of(new ConvertJSONTextToEPCTransaction(bu)))
             .apply(s"${bu.name}_WriteToBQ",  BigQueryIO.write()
              .to(bqDestTable)
              .withSchema(schema)
              .withFormatFunction(new ConvertMessageToTable())
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND))
          p.run
      

      The error comes with this part.

      package org.apache.beam.runners.core.construction;
      
      import org.apache.beam.model.pipeline.v1.RunnerApi;
      import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ProtocolMessageEnum;
      
      /** Returns the standard URN of a given enum annotated with [(standard_urn)]. */
      public class BeamUrns {
        /** Returns the standard URN of a given enum annotated with [(standard_urn)]. */
        public static String getUrn(ProtocolMessageEnum value) {
          return value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn);
        }
      }
      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            yohei Yohei Shimomae
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: