Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29251

intermittent serialization failures in spark

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.4.3
    • None
    • Spark Core
    • We are running Spark 2.4.3 on an AWS EMR cluster. Our cluster consists of one driver instance, 3 core instances, and 3 to 12 task instances; the task group autoscales as needed.

    Description

      We are running an EMR cluster on AWS that processes somewhere around 100 batch jobs a day. These jobs are running various SQL commands to transform data and the data volume ranges between a dozen or a few hundred MB to a few GB on the high end for some jobs, and even around ~1 TB for one particularly large one. We use the Kryo serializer and preregister our classes like so:

       

      object KryoRegistrar {
      
        val classesToRegister: Array[Class[_]] = Array(
          classOf[MyModel],
         [etc]
        )
      }
      
      // elsewhere
      
      val sparkConf = new SparkConf()
            .registerKryoClasses(KryoRegistrar.classesToRegister)
      

       

       

      Intermittently throughout the cluster's operation we have observed jobs terminating with the following stack trace:

       

       

      org.apache.spark.SparkException: Failed to register classes with Kryo
      	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
      	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
      	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
      	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
      	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
      	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
      	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
      	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
      	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
      	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
      	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
      	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
      	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
      	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
      	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
      	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
      	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
      	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
      	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
      	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
      	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
      	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
      	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
      	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
      	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
      	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
      	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
      	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
      	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
      	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
      	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
      	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
      	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
      	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
      	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
      	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
      	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
      	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
      	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
              [our code that writes data to CSV]	
      Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
      	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Class.java:348)
      	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
      	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
      	... 132 more

      The real name of the model has been changed as this is proprietary code, but otherwise I have reproduced the stack trace in full. This error is not reproducible and seems to strike mostly randomly. It does not seem to be related to the size of the job, and if we rerun the job with identical parameters, it usually just succeeds, although superficially it does seem to coincide with times when cluster load is high. Most of our jobs are run with the following params:

       

       

      spark.driver.memory = 2G
      spark.executor.memory = 4G
      spark.driver.cores = 1
      spark.executor.cores = 2
      spark.executor.instances = 2

      We also use dynamic allocation to allow for up to 14 cores per job and fair scheduling.

       

      This error has been plaguing us for months, and I don't have a good handle on what I can do about it. It seems like a bug in Spark internals and possibly related to SPARK-21928, but we use Spark 2.4.3 and according to that ticket the problem has been fixed. It's particularly frustrating because the error is not directly reproducible.

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            jvinokurov Jerry Vinokurov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: