Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18779

Messages being received only from one partition when using Spark Streaming integration for Kafka 0.10 with kafka client library at 0.10.1

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Not A Bug
    • 2.0.2
    • None
    • DStreams
    • None

    Description

      I apologize for the earlier descripion which wasnt very clear about the issue. I would give a detailed description and my usecase now -

      I have a spark application running which is consuming kafka messages using Spark Kafka 0.10 integration. I now need to stop my spark application and the user would then tell what timestamp in the past the spark application should start reading messages from (replaying messages). The timestamp is mapped to kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced in 10.1.0 client of Kafka. That offset is then used to create DStream

      Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka 10.1.0.

      So to achieve that behavior I replaced the 10.0.1 jar in Spark environment with 10.1.0 jar. Things started working for me but the application could read only messages from the first partition.

      To recreate the issue I wrote a local program and had 10.1.0 jar in the classpath

      ********************************
      val topics = Set("Z1Topic")
      val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]()
      topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded offset to 10 instead of getting the offset from 'offsetsForTimes'
      topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L)

      import scala.collection.JavaConversions._
      val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferBrokers, Subscribe[String, String](topics, kafkaParams, topicPartitionOffsetMap))
      val x = stream.map(x => x.value())
      x.print()
      ********************************

      This printed only the messages in the first topic from offset 10. (This is with 10.1.0 client)

      If I am to use Kafka 10.0.1 client for the above program, things work fine and I receive messages from all partitions but I cant use the 'offsetsForTimes' API (because it doesnt exist in 10.0.1 client).

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pnakhe Pranav Nakhe
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: