Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
1.6.0, 1.6.1
-
None
Description
Map operation on JavaPairDStream throws Task not serializable Exception.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:725)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:155)
at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:42)
at SparkKafkaConProd.writeToKafkaEfficientlyWithMutipleReceivers(SparkKafkaConProd.java:161)
at SparkKafkaConProd.main(SparkKafkaConProd.java:389)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.KafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
Serialization stack:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 23 more
Steps to Reproduce :
1. Create a JavaPairDStream using KafkaUtils.createStream API
2. Invoke any map operation throws above exception e.g JavaPairDStream .toJavaDStream().map() or JavaPairDStream.map()
Naveen