Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
We've been using KafkaIO's withValueDeserializerAndCoder method to provide a Beam Row Coder for many versions of Beam, however, it stopped working in 2.30 after ReadFromKafkaViaSDF implementation was made the default in BEAM-12114.
As far as I can see, ReadFromKafkaViaUnbounded simply uses the key and the value coders that were passed with withValueDeserializerAndCoder.
But ReadFromKafkaViaSDF relies on an internal ReadSourceDescriptors class, which, for some reason, doesn't receive the provided coders directly. Instead, it leverages DeserializerProvider which uses the CoderRegistry to get the coders. BEAM-9569 added a check that practically prevents using CoderRegistry with Beam Row objects. The prevents us from using Beam Rows with KafkaIO.
As a workaround, we can use --experiments=beam_fn_api_use_deprecated_read to force ReadFromKafkaViaUnbounded implementation, but I'm afraid it'll be eventually deleted.
So, I feel like either:
- The existing KafkaIO documentation or examples needs to be updated to show how to rely on schemas when using Beam Rows, as was suggested in
BEAM-9569. But I don't see how it can work with the existing implementation of KafkaIO. - Or ReadFromKafkaViaSDF needs to use the provided coders and not fallback to DeserializerProvider.