Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16573

Streams does not specify where a Serde is needed

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Patch Available
    • Minor
    • Resolution: Unresolved
    • 3.7.0
    • None
    • streams
    • None

    Description

      Example topology:

       builder
         .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
         .groupBy((key, value) => new KeyValue(value, key))
         .count()
         .toStream()
         .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
       

      At runtime, we get the following exception 

      Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
      org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
          at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
          at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
          at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
          at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
          at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
          at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
          at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
          at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
          at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
          at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)

      The error does not give information about the line or the processor causing the issue.

      Here a Grouped was missing inside the groupBy, but because the groupBy api doesn't force to define Grouped, this one can be missed, and it could be difficult to spot on a more complex topology. 

      Also, for someone who needs control over serdes in the topology and doesn't want to define default serdes.

       

        

      Attachments

        Issue Links

          Activity

            People

              ayoubomari Ayoub Omari
              ayoubomari Ayoub Omari
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: