Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Won't Fix
-
2.28.0
-
None
-
IntelliJ community version, Maven, Windows, Dataflow version 2.28.0
-
Patch, Important
Description
Hello,
Our team is facing an issue in streaming the Dataflow Kafka job through IntelliJ that is hosted on a private subnet.
The hypothesis is that during Graph Construction time [0], the beam locally tries to execute the code and check all the connections. In our case, we don't have access to subnet through IntelliJ or through the Cloud console. We do have access when compute engine instance is created within that subnet.
We reached out to Google support and they suggested us to raise a defect with u. The following code throws time-out error when we execute through IntelliJ.
pipeline.apply("Read Kafka", KafkaIO.<String, String>read() .withConsumerConfigUpdates(propertyBuilder) .withConsumerConfigUpdates( ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group") ) .withBootstrapServers(options.getBootstrapServers()) .withTopics(topicsList) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .commitOffsetsInFinalize() // .withMaxNumRecords(5) )
But, if we uncomment
.withMaxNumRecords()
The code works perfectly and we are able to spin up dataflow job in the desired subnet to ingest the Kafka stream.
pipeline.apply("Read Kafka", KafkaIO.<String, String>read() .withConsumerConfigUpdates(propertyBuilder) .withConsumerConfigUpdates( ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group") ) .withBootstrapServers(options.getBootstrapServers()) .withTopics(topicsList) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .commitOffsetsInFinalize() .withMaxNumRecords(5) )
The issue with the above Code is that the Dataflow will stop after ingesting the given number of records and will act like Batch ingestion, instead of Streaming, which we don't want.
Google support team hypothesis:
Current hypothesis is that the issue is happening in `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic information.
The first point is, `withMaxNumRecords` is used for testing [2] and when specified, the unbounded nature of the pipeline is converted into bounded read in `BoundedReadFromUnboundedSource` [3] but without the `withMaxNumRecords` the pipeline is still unbounded.
When the pipeline is Bounded (when mentioning withMaxNumRecords) the `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on Dataflow, it did not have issue connecting to Kafka.
But, when the pipeline is Unbounded (withMaxNumRecords commented out) the `split()` is called when the pipeline is built locally at graph construction phase [5][6] at which point it does not have access to Kafka.
[0]
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job
[1]
https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57
[2] https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-
[3] https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193
[4] https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169
[5] https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87
[6] https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job