Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13320

KafkaIO: inconsistent behaviour with Beam Row coders

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • io-java-kafka
    • None

    Description

      We've been using KafkaIO's withValueDeserializerAndCoder method to provide a Beam Row Coder for many versions of Beam, however, it stopped working in 2.30 after ReadFromKafkaViaSDF implementation was made the default in BEAM-12114.

      As far as I can see, ReadFromKafkaViaUnbounded simply uses the key and the value coders that were passed with withValueDeserializerAndCoder.

      But ReadFromKafkaViaSDF relies on an internal ReadSourceDescriptors class, which, for some reason, doesn't receive the provided coders directly. Instead, it leverages DeserializerProvider which uses the CoderRegistry to get the coders. BEAM-9569 added a check that practically prevents using CoderRegistry with Beam Row objects. The prevents us from using Beam Rows with KafkaIO.

      As a workaround, we can use --experiments=beam_fn_api_use_deprecated_read to force ReadFromKafkaViaUnbounded implementation, but I'm afraid it'll be eventually deleted. 

      So, I feel like either:

      • The existing KafkaIO documentation or examples needs to be updated to show how to rely on schemas when using Beam Rows, as was suggested in BEAM-9569. But I don't see how it can work with the existing implementation of KafkaIO.
      • Or ReadFromKafkaViaSDF needs to use the provided coders and not fallback to DeserializerProvider

      Attachments

        Activity

          People

            Unassigned Unassigned
            sap1ens Yaroslav Tkachenko
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: