Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.10.0
-
None
-
None
Description
My working environment:
- Apache Beam Java SDK version: works with 2.9.0 but failed with 2.10.0
- Runner: failed with both Direct Runner and Dataflow Runner
- Application code: Scala (note I did not use Scio)
I tried to change Apache Beam Java SDK version from 2.9.0 to 2.10.0 and deploy it to Dataflow but I got this error. It works with 2.9.0. Am I missing something?
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.beam.model.pipeline.v1.RunnerApi$BeamConstants$Constants.getValueDescriptor()Lorg/apache/beam/vendor/grpc/v1p13p1/com/google/protobuf/Descriptors$EnumValueDescriptor;
at org.apache.beam.sdk.transforms.windowing.BoundedWindow.extractTimestampFromProto(BoundedWindow.java:84)
at org.apache.beam.sdk.transforms.windowing.BoundedWindow.<clinit>(BoundedWindow.java:49)
at org.apache.beam.sdk.coders.CoderRegistry$CommonTypes.<init>(CoderRegistry.java:140)
at org.apache.beam.sdk.coders.CoderRegistry$CommonTypes.<init>(CoderRegistry.java:97)
at org.apache.beam.sdk.coders.CoderRegistry.<clinit>(CoderRegistry.java:160)
at org.apache.beam.sdk.Pipeline.getCoderRegistry(Pipeline.java:326)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:707)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:309)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:182)
My code is in Scala but it works well with Beam 2.9.0.
val p = Pipeline.create(options) p.apply(s"${bu.name}_ReadFromKafka", KafkaIO.read() .withBootstrapServers(options.getBootstreapServers) .updateConsumerProperties(config) .withTopics(util.Arrays.asList(topicName)) .withKeyDeserializer(classOf[LongDeserializer]) .withValueDeserializer(classOf[StringDeserializer]) .withConsumerFactoryFn( new KafkaTLSConsumerFactory( projectId, options.getSourceBucket, options.getTrustStoreGCSKey, options.getKeyStoreGCSKey))) .apply(s"${bu.name}_Convert", ParDo.of(new ConvertJSONTextToEPCTransaction(bu))) .apply(s"${bu.name}_WriteToBQ", BigQueryIO.write() .to(bqDestTable) .withSchema(schema) .withFormatFunction(new ConvertMessageToTable()) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)) } p.run
According to the error log, it failed at this part.
https://github.com/apache/beam/blob/v2.10.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L81-L85
private static Instant extractTimestampFromProto(RunnerApi.BeamConstants.Constants constant) { return new Instant( Long.parseLong( constant.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant))); }
This constant come from this part.
https://github.com/apache/beam/blob/v2.10.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48-L49
public static final Instant TIMESTAMP_MIN_VALUE = extractTimestampFromProto(RunnerApi.BeamConstants.Constants.MIN_TIMESTAMP_MILLIS);