Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5398

Joins on GlobalKTable don't work properly when combined with Avro and the Confluent Schema Registry

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.10.2.0, 0.10.2.1
    • None
    • streams
    • None
    • Kafka, Avro, Confluent Schema Registry (3.2.1)

    Description

      Joins between a KStream and GlobalKTable is not working as expected when using the following setup:

      • Use Kafka in combination with the Confluent Schema Registry
      • Feed a topic (my-global-topic) that will be use as a GlobalKTable input by posting some messages with an Avro GenericRecord as the key (using a traditional Producer/ProducerRecord for example).
        The dumb avro schema for the exemple:
        {
          "type": "record",
          "name": "AvroKey",
          "namespace": "com.test.key",
          "fields": [
            {
              "name": "anyfield",
              "type": "string"
            }
          ]
        }
        
      • Start a kafka stream process that process messages using this time an Avro SpecificRecord (AvroKey) generated by the Avro compiler for the same schema
        KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic");
        GlobalKTable<AvroKey, AnyObject> globalTable = builder.globalTable("my-global-topic", "my-global-topic-store");
        stream
        	.leftJoin(globalTable, (k, v) -> new AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/)
        	.print("Result");
        

        Note that the schema generated by Avro for the SpecificRecord slightly differs from the original one because we use String instead of CharSequence (Avro config):

        {
          "type": "record",
          "name": "AvroKey",
          "namespace": "com.test.key",
          "fields": [
            {
              "name": "anyfield",
              "type": {
                "type": "string",
                "avro.java.string": "String"
              }
            }
          ]
        }
        

      Now our issue is that when the RocksDBStore of the GlobalKTable will be initilized, it will use the byte[] straight from the key.
      https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179
      https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164

      Schemas for producer and stream app differs slightly (but are compatible), so they are registred with a different global id.
      Since the id is contained in the binary representation, the lookup will fail during the join.
      I didn't test but the issue is probably broader than just this case: if the we have an upstream producer that is doing a schema evolution (with backwards compatible change), it should lead to the same issue.

      Please note that when using a KTable instead of GlobalKTable it works fine, because the key is first deserialized and then reserialized using the current serdes:
      https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197
      https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198

      To conclude I'm not sure to fully understand yet how all pieces connect together for state stores, but I assume that for a GlobalKTable there should also be a derserialization/reserialization for each key before storing them in RocksDB (at least to make KTable and GlobalKTable beahvior coherent).

      Attachments

        Activity

          People

            Unassigned Unassigned
            benba Benjamin Bargeton
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: