Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.0.0
-
None
Description
I did quite a bit of testing around the avro upgrades, and it did not behave as I would have expected when the avro is used as a Key for a global table with a rocksDB store
setup:
- local confluent suite 4.0.2
- test with stream app and producer (v 1.0.0)
- schemas (key) :
schemas :
schema version @1 { "namespace": "com.bell.cts.livecms.livemedia.topic", "type" : "record", "name" : "EventKey", "fields" : [ {"name" : "keyAttribute1", "type" : "string"} ] } schema version @2 { "namespace": "com.bell.cts.livecms.livemedia.topic", "type" : "record", "name" : "EventKey", "fields" : [ {"name" : "keyAttribute1", "type" : "string"}, {"name" : "keyAttribute2", "type" : ["null", "string"], "default" : null} ] }
- TEST1 (PASS)
- using schema version @1
- produce record1=[k@1, v@1]
- stream apps loads record1 in global table and store locally in rocksdb
- asyncAssert that store.get(k@1)=v@1 : PASS
- TEST2 (PASS)
- using schema version @1
- delete local store (and checkpoint)
- stream apps loads record1 in global table and store locally in rocksdb
- asyncAssert that store.get(k@1)=v@1 : PASS
- TEST3 (FAIL)
- using schema version @2
- keep local store
- stream apps does not reload record1 from topic because of local offset
- asyncAssert that store.get(k@1)=v@1 : FAIL
- however store.all().next().key.equals(k@2) , as built using schema version 2
- this would be explained by the fact that the rocksdb store has some magic byte persisted of the record based on schema version 1
- Not ideal, but I could consider accceptable to delete the local store in this cases.
- TEST4 (FAIL)
- using schema version @2
- delete local store (and checkpoint)
- stream apps loads record1 (produced from schema @1) in global table and store locally in rocksdb
- asyncAssert that store.get(k@2)=v@2 : FAIL
- however store.all().next().key.equals(k@2) , as built using schema version 2
- I can't quite understand this one. I would have expected that the rockdb store should now be provisioned with a serialized version of the record based on the schema v2 (as it went though the stream app underpinning the store materialization)
- TEST5 (FAIL)
- using schema version @2
- produce record2=[k@2, v@2] (meant to be backward compatible and logically equals to record1)
- stream apps does the processing of record1(produced from schema @1) and record2 (produced from schema @2) and materialize the global table stored locally in rocksdb
- asyncAssert that store.get(k@2)=v@2 : PASS but the store now has 2 entries !!!
- it looks as if the stream.groupBy(key) of the topic underpinning the globaltable materialization did not group the 2 record keys together, although record1.key.equals(record2.key) is true in Java (by looping in the store)
reading from the upstream raw topic throughout the testing :
/tmp$ kafka-avro-console-consumer --topic topic-test-5 --bootstrap-server localhost:9092 --property schema.registry.url=http://127.0.0.1:8081 --property print.key=true --from-beginning {"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"} {"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"} {"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"} {"keyAttribute1":"key-attribute-1","keyAttribute2":null} {"valueAttribute1":"value-1"}