Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13310

KafkaIO SDF does not commit offsets but KafkaIO UnboundedSource does

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • None
    • io-java-kafka
    • None

    Description

      When run using SDF the pipeline does not commit offsets but when run using the SDF UnboundedSourceWrapper via use_deprecated_read experiment the pipeline does. This implies that the UnboundedSource version is able to correctly commit offsets but the pure SDF does not.

      Sample code:

              final Pipeline p = Pipeline.create(options);
      
              p.apply(
      
                  KafkaIO.<Long, String>read()
      
                      .withBootstrapServers(options.getKafkaBroker())
      
                      .withTopic(options.getTopic())
      
                      .withConsumerConfigUpdates(
      
                          Map.of(
      
                              ConsumerConfig.GROUP_ID_CONFIG, options.getConsumerGroup(),
      
                              CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL",
      
                              SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",
      
                              SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.GssLoginModule required initiate=true;"))
      
                      .withKeyDeserializer(LongDeserializer.class)
      
                      .withValueDeserializer(StringDeserializer.class)
      
                      .commitOffsetsInFinalize()
      
                      .withoutMetadata());
      

      Attachments

        Issue Links

          Activity

            People

              johnjcasey John Casey
              lcwik Luke Cwik
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 40m
                  1h 40m