Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21561

spark-streaming-kafka-010 DSteam is not pulling anything from Kafka

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.1.1
    • None
    • DStreams

    Description

      I am trying to use spark-streaming-kafka-0.10 to pull messages from a kafka topic(broker version 0.10). I have checked that messages are being produced and used a KafkaConsumer to pull them successfully. Now, when I try to use the spark streaming api, I am not getting anything. If I just use KafkaUtils.createRDD and specify some offset ranges manually it works. But when, I try to use createDirectStream, all the rdds are empty and when I check the partition offsets it simply reports that all partitions are 0. Here is what I tried:

       val sparkConf = new SparkConf().setAppName("kafkastream")
       val ssc = new StreamingContext(sparkConf, Seconds(3))
       val topics = Array("my_topic")
      
       val kafkaParams = Map[String, Object](
         "bootstrap.servers" -> "hostname:6667"
         "key.deserializer" -> classOf[StringDeserializer],
         "value.deserializer" -> classOf[StringDeserializer],
         "group.id" -> "my_group",
         "auto.offset.reset" -> "earliest",
         "enable.auto.commit" -> (true: java.lang.Boolean)
       )
      
       val stream = KafkaUtils.createDirectStream[String, String](
         ssc,
         PreferConsistent,
         Subscribe[String, String](topics, kafkaParams)
       )
      
       stream.foreachRDD { rdd =>
         val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
         rdd.foreachPartition { iter =>
           val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
           println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
         }
      
         val rddCount = rdd.count()
         println("rdd count: ", rddCount)
      
         // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
       }
      
       ssc.start()
       ssc.awaitTermination()
      

      All partitions show offset ranges from 0 to 0 and all rdds are empty. I would like it to start from the beginning of a partition but also pick up everything that is being produced to it.

      I have also tried using spark-streaming-kafka-0.8 and it does work. I think it is a 0.10 issue because everything else works fine. Thank you!

      Attachments

        Activity

          People

            Unassigned Unassigned
            vladbadelita Vlad Badelita
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: