Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-2746

Scala Spark-Streaming with Kafka Integration program does not show output

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.7.0
    • 0.7.0
    • Interpreters
    • Patch, Important

    Description

      I have created 8 messages using the Kafka console producer, such that when I execute the console consumer

      ./kafka-console-consumer.sh --bootstrap-server vrxhdpkfknod.eastus.cloudapp.azure.com:6667 --topic spark-streaming --from-beginning
      I get 8 messages displayed

      ^CProcessed a total of 8 messages
      When I execute the spark 2 code in Zeppelin,

      %spark2
      import org.apache.kafka.clients.consumer.ConsumerRecord
      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.streaming.kafka010._
      import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming._
      sc.setLogLevel("ERROR")  // prevent INFO logging from polluting output
      val ssc =  StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(5)))    // creating the StreamingContext with 5 seconds interval
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "vrxhdpkfknod.eastus.cloudapp.azure.com:6667",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "kafka-streaming-example",
        "auto.offset.reset" -> "earliest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      val topics = Array("spark-streaming")
      val messages = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topics, kafkaParams)
      )
      messages.foreachRDD { rdd =>
            System.out.println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")
            rdd.foreach { record =>
              System.out.print(record.value())
            }
          }
      ssc.start()
      

      I get

      import org.apache.kafka.clients.consumer.ConsumerRecord
      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming._
      
      ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@377213ce
      kafkaParams: scala.collection.immutable.Map[String,Object] = Map(key.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, auto.offset.reset -> earliest, group.id -> kafka-streaming-example, bootstrap.servers -> vrxhdpkfknod.eastus.cloudapp.azure.com:6667, enable.auto.commit -> false, value.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer)
      topics: Array[String] = Array(spark-streaming)
      messages: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@328d6f9a
      

      There are no error messages. But there is no display of the Scala output. The same code when run in Spark Shell works just fine.

      Attachments

        Activity

          People

            Unassigned Unassigned
            jybsws Jyoti Biswas
            Votes:
            3 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 96h
                96h
                Remaining:
                Remaining Estimate - 96h
                96h
                Logged:
                Time Spent - Not Specified
                Not Specified