Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
4.0.0
-
None
Description
The app example:
object SparkTest { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("zt-test") .config("spark.logConf",true) .getOrCreate() spark.sparkContext.setLogLevel("INFO") val threadPool = Executors.newFixedThreadPool(5) for (i <- 0 until 10) { threadPool.execute(new Task("Task " + i)) // do not shut down threadPool } val rdd = spark.sparkContext.makeRDD(Seq(1,2,4)) val res = rdd.collect() // throw throw new Throwable() } } class Task(private var name: String) extends Runnable { override def run(): Unit = { System.out.println("Executing task: " + name + " by " + Thread.currentThread.getName) } }
when app is running on yarn with cluster mode, even if thread pool is not closed, driver will shut down, which can not lead container not to stop. However, when running on k8s, if thread pool is not closed, the driver pod will be stuck, and will not release resource.
With yarn-cluster mode, the ApplicationMaster wrapped with 'System.exit', like this
ugi.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = System.exit(master.run()) })
so, when threads are parking, exitcode can also be passed to System#exit. In this sutiation, AM can stop.
When driver is on k8s with client mode, if encounters exception and thread pool is not closed, driver pod may stuck.
Attachments
Issue Links
- links to