Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.10.2.0, 0.10.2.1
-
None
-
None
-
Kafka, Avro, Confluent Schema Registry (3.2.1)
Description
Joins between a KStream and GlobalKTable is not working as expected when using the following setup:
- Use Kafka in combination with the Confluent Schema Registry
- Feed a topic (my-global-topic) that will be use as a GlobalKTable input by posting some messages with an Avro GenericRecord as the key (using a traditional Producer/ProducerRecord for example).
The dumb avro schema for the exemple:{ "type": "record", "name": "AvroKey", "namespace": "com.test.key", "fields": [ { "name": "anyfield", "type": "string" } ] }
- Start a kafka stream process that process messages using this time an Avro SpecificRecord (AvroKey) generated by the Avro compiler for the same schema
KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic"); GlobalKTable<AvroKey, AnyObject> globalTable = builder.globalTable("my-global-topic", "my-global-topic-store"); stream .leftJoin(globalTable, (k, v) -> new AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/) .print("Result");
Note that the schema generated by Avro for the SpecificRecord slightly differs from the original one because we use String instead of CharSequence (Avro config):
{ "type": "record", "name": "AvroKey", "namespace": "com.test.key", "fields": [ { "name": "anyfield", "type": { "type": "string", "avro.java.string": "String" } } ] }
- Last but not least, the Confluent Schema Registry will use byte 1-4 of the Avro serialized object to put the schema id of the schema stored in the schema registry.
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
Now our issue is that when the RocksDBStore of the GlobalKTable will be initilized, it will use the byte[] straight from the key.
https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179
https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164
Schemas for producer and stream app differs slightly (but are compatible), so they are registred with a different global id.
Since the id is contained in the binary representation, the lookup will fail during the join.
I didn't test but the issue is probably broader than just this case: if the we have an upstream producer that is doing a schema evolution (with backwards compatible change), it should lead to the same issue.
Please note that when using a KTable instead of GlobalKTable it works fine, because the key is first deserialized and then reserialized using the current serdes:
https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197
https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198
To conclude I'm not sure to fully understand yet how all pieces connect together for state stores, but I assume that for a GlobalKTable there should also be a derserialization/reserialization for each key before storing them in RocksDB (at least to make KTable and GlobalKTable beahvior coherent).