Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.4
-
None
Description
When using PySpark to read from Kafka, there is a race condition that Spark may use KafkaConsumer in multiple threads at the same time and throw the following error:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059) at org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451) at org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130) at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
When using PySpark, reading from Kafka is actually happening in a separate writer thread rather that the task thread. When a task is early terminated (e.g., there is a limit operator), the task thread may stop the KafkaConsumer when the writer thread is using it.