Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6713

Provide an easy way replace store with a custom one on High-Level Streams DSL

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.0.1
    • None
    • streams

    Description

      I am trying to use GlobalKTable with a custom store implementation. In my stores, I would like to store my `Category` entites and I would like to query them by their name as well. My custom store has some capabilities beyond `get` such as get by `name`. I also want to get all entries in a hierarchical way in a lazy fashion. I have other use cases as well.

       

      In order to accomplish my task I had to implement  a custom `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 

       

      public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, byte[]> {
      
        private BytesTypeConverter<K, V> converter;
      
        private KeyValueStore<K, V> delegated;
      
        public DelegatingByteStore(KeyValueStore<K, V> delegated, BytesTypeConverter<K, V> converter) {
          this.converter = converter;
          this.delegated = delegated;
        }
      
        @Override
        public void put(Bytes key, byte[] value) {
          delegated.put(converter.outerKey(key),
                        converter.outerValue(value));
        }
      
        @Override
        public byte[] putIfAbsent(Bytes key, byte[] value) {
          V v = delegated.putIfAbsent(converter.outerKey(key),
                                      converter.outerValue(value));
          return v == null ? null : value;
        }
        ......
      
      

       

       Type Converter:

      public interface TypeConverter<K, IK, V, IV> {
      
        IK innerKey(final K key);
      
        IV innerValue(final V value);
      
        List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from);
      
        List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>> from);
      
        V outerValue(final IV value);
      
        KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from);
      
        KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry);
      
        K outerKey(final IK ik);
      }
      

       

      This is unfortunately too cumbersome and hard to maintain.  

       

      Attachments

        1. TypeConverter.java
          0.6 kB
          Cemalettin Koç
        2. DelegatingByteStore.java
          4 kB
          Cemalettin Koç
        3. BytesTypeConverter.java
          2 kB
          Cemalettin Koç

        Activity

          People

            Unassigned Unassigned
            cemo Cemalettin Koç
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: