Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-4074

Spark interpreter failed with "Not a version: 9" when using json-play

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • 0.8.1
    • None
    • Interpreters
    • None

    Description

      The following spark code connects to a kafka queue running on my local machine (inside devicehive platform) and reads sensor data messages.

      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.streaming.kafka010._
      import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import play.api.libs.json._
      
      case class SenseData(hash: String, value: Int, updated: String)
      
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "sensor_data",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      
      val topics = Array("plugin_topic_536cc303-eb2f-4ff9-b546-d8c59b6c5466")
      
      
      val streamingContext = new StreamingContext(sc, Seconds(120))
      
      val stream = KafkaUtils.createDirectStream(
        streamingContext,
        PreferBrokers,
        Subscribe[String, String](topics, kafkaParams)
      )
      
      stream.map(
        record => {
          var json: JsValue = Json.parse(record.value)
          SenseData(json("b")("notification")("deviceId").as[String], json("b")("notification")("parameters")("t").as[Int], json("b")("notification")("timestamp").as[String])
        }
      ).foreachRDD( rdd=> {
        println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")
        rdd.foreach(
          record => {
            record.productIterator.foreach{ i => println(i) }
          }
        )
        rdd.toDF().registerTempTable("sensedata")
      }
      )
      
      streamingContext.start()

      When the code is added to a zeppelin paragraph and executed, the following error is reported.

      WARN [2019-03-15 16:22:35,054] ({pool-2-thread-3} NotebookServer.java[afterStatusChange]:2316) - Job 20190304-085834_2122705886 is finished, status: ERROR, exception: null, result: %text java.lang.NumberFormatException: Not a version: 9
      at scala.util.PropertiesTrait$class.parts$1(Properties.scala:184)
      at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:187)

      It looks like that some dependency on JDK 9 is being checked during class loading. However I am running JDK 8. Is there a way to fix this?

       

      Some observations:

      1) I could run the exact same code in spark shell without any error. This suggests that the issue may be in zeppelin code base.

      ./bin/spark-shell --packages "org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0,com.typesafe.play:play-json_2.11:2.7.0"

      2) This started to show up when com.typesafe.play:play-json_2.11:2.6.8 (or any version below 2.6.11) is added as a dependency to facilitate conversation from JSON to Scala and vice versa in my code.

      3) However, if I use com.typesafe.play:play-json_2.11:2.7.0, it throws a different error which I was able to fix by excluding conflicting dependencies.

      WARN [2019-03-15 16:11:29,203] ({pool-2-thread-2} NotebookServer.java[afterStatusChange]:2316) - Job 20190304-085834_2122705886 is finished, status: ERROR, exception: null, result: %text warning: there was one deprecation warning; re-run with -deprecation for details
      com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8
      at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
      at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
      at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
      at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
      at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
      at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:80)
      at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:57)
      at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
      at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)

      When the following exclusions were added to the spark dependencies against com.typesafe.play:play-json_2.11:2.7.0, this error got disappeared.

      com.fasterxml.jackson.core:jackson-databind,com.fasterxml.jackson.datatype:jackson-datatype

      Complete stack trace:

      INFO [2019-03-15 16:22:27,440] ({pool-2-thread-3} RemoteInterpreterManagedProcess.java[start]:190) - Run interpreter process [/Users/ckit/platforms/zeppelin-0.8.1-bin-netinst/bin/interpreter.sh, -d, /Users/ckit/platforms/zeppelin-0.8.1-bin-netinst/interpreter/spark, -c, 10.76.26.5, -p, 62197, -r, :, -l, /Users/ckit/platforms/zeppelin-0.8.1-bin-netinst/local-repo/spark, -g, spark]
      INFO [2019-03-15 16:22:32,991] ({pool-9-thread-1} RemoteInterpreterManagedProcess.java[callback]:123) - RemoteInterpreterServer Registered: CallbackInfo(host:10.76.26.5, port:62200)
      INFO [2019-03-15 16:22:32,995] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.SparkInterpreter
      INFO [2019-03-15 16:22:33,170] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.SparkSqlInterpreter
      INFO [2019-03-15 16:22:33,173] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.DepInterpreter
      INFO [2019-03-15 16:22:33,181] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.PySparkInterpreter
      INFO [2019-03-15 16:22:33,189] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.IPySparkInterpreter
      INFO [2019-03-15 16:22:33,198] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.SparkRInterpreter
      INFO [2019-03-15 16:22:33,200] ({pool-2-thread-3} RemoteInterpreter.java[call]:142) - Open RemoteInterpreter org.apache.zeppelin.spark.SparkInterpreter
      INFO [2019-03-15 16:22:33,201] ({pool-2-thread-3} RemoteInterpreter.java[pushAngularObjectRegistryToRemote]:436) - Push local angular object registry from ZeppelinServer to remote interpreter group spark:shared_process
      WARN [2019-03-15 16:22:35,054] ({pool-2-thread-3} NotebookServer.java[afterStatusChange]:2316) - Job 20190304-085834_2122705886 is finished, status: ERROR, exception: null, result: %text java.lang.NumberFormatException: Not a version: 9
      at scala.util.PropertiesTrait$class.parts$1(Properties.scala:184)
      at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:187)
      at scala.util.Properties$.isJavaAtLeast(Properties.scala:17)
      at scala.tools.util.PathResolverBase$Calculated$.javaBootClasspath(PathResolver.scala:276)
      at scala.tools.util.PathResolverBase$Calculated$.basis(PathResolver.scala:283)
      at scala.tools.util.PathResolverBase$Calculated$.containers$lzycompute(PathResolver.scala:293)
      at scala.tools.util.PathResolverBase$Calculated$.containers(PathResolver.scala:293)
      at scala.tools.util.PathResolverBase.containers(PathResolver.scala:309)
      at scala.tools.util.PathResolver.computeResult(PathResolver.scala:341)
      at scala.tools.util.PathResolver.computeResult(PathResolver.scala:332)
      at scala.tools.util.PathResolverBase.result(PathResolver.scala:314)
      at scala.tools.nsc.backend.JavaPlatform$class.classPath(JavaPlatform.scala:28)
      at scala.tools.nsc.Global$GlobalPlatform.classPath(Global.scala:115)
      at scala.tools.nsc.Global.scala$tools$nsc$Global$$recursiveClassPath(Global.scala:131)
      at scala.tools.nsc.Global$GlobalMirror.rootLoader(Global.scala:64)
      at scala.reflect.internal.Mirrors$Roots$RootClass.<init>(Mirrors.scala:307)
      at scala.reflect.internal.Mirrors$Roots.RootClass$lzycompute(Mirrors.scala:321)
      at scala.reflect.internal.Mirrors$Roots.RootClass(Mirrors.scala:321)
      at scala.reflect.internal.Mirrors$Roots$EmptyPackageClass.<init>(Mirrors.scala:330)
      at scala.reflect.internal.Mirrors$Roots.EmptyPackageClass$lzycompute(Mirrors.scala:336)
      at scala.reflect.internal.Mirrors$Roots.EmptyPackageClass(Mirrors.scala:336)
      at scala.reflect.internal.Mirrors$Roots.EmptyPackageClass(Mirrors.scala:276)
      at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:250)
      at scala.tools.nsc.Global.rootMirror$lzycompute(Global.scala:73)
      at scala.tools.nsc.Global.rootMirror(Global.scala:71)
      at scala.tools.nsc.Global.rootMirror(Global.scala:39)
      at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass$lzycompute(Definitions.scala:257)
      at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass(Definitions.scala:257)
      at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1390)
      at scala.tools.nsc.Global$Run.<init>(Global.scala:1242)
      at scala.tools.nsc.interpreter.IMain.scala$tools$nsc$interpreter$IMain$$_initialize(IMain.scala:139)
      at scala.tools.nsc.interpreter.IMain.initializeSynchronous(IMain.scala:161)
      at org.apache.zeppelin.spark.SparkScala211Interpreter.open(SparkScala211Interpreter.scala:85)
      at org.apache.zeppelin.spark.NewSparkInterpreter.open(NewSparkInterpreter.java:102)
      at org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:62)
      at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:69)
      at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:616)
      at org.apache.zeppelin.scheduler.Job.run(Job.java:188)
      at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:140)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)

      Attachments

        1. Screen Shot 2019-03-17 at 9.00.25 am.png
          66 kB
          Chandana Kithalagama

        Activity

          People

            Unassigned Unassigned
            ckit Chandana Kithalagama
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: