Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
-
None
-
None
Description
When run using SDF the pipeline does not commit offsets but when run using the SDF UnboundedSourceWrapper via use_deprecated_read experiment the pipeline does. This implies that the UnboundedSource version is able to correctly commit offsets but the pure SDF does not.
Sample code:
final Pipeline p = Pipeline.create(options); p.apply( KafkaIO.<Long, String>read() .withBootstrapServers(options.getKafkaBroker()) .withTopic(options.getTopic()) .withConsumerConfigUpdates( Map.of( ConsumerConfig.GROUP_ID_CONFIG, options.getConsumerGroup(), CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL", SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka", SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.GssLoginModule required initiate=true;")) .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class) .commitOffsetsInFinalize() .withoutMetadata());
Attachments
Issue Links
- links to