  1. Spark
  2. SPARK-23156

Code of method "initialize(I)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.1.1, 2.1.2
    • None
    • Spark Submit, SQL
    • None
    • Ubuntu 16.04, Scala 2.11, Java 8, 8-node YARN cluster.


      I am getting this trying to generate a random DataFrame  (300 columns, 5000 rows, Ints, Floats and Timestamps in equal ratios). This is similar (but not identical) to SPARK-18492 and few tickets more that should be done in 2.1.1.

      Part of the logs below. They contain hundreds of millions of lines of generated code, apparently for each of the 1500 000 fields of the dataframe which is very suspicious. 

      18/01/19 06:33:15 INFO CodeGenerator: Code generated in 246.168393 ms$
      18/01/19 06:33:21 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "initialize(I)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB$
      /* 001 */ public java.lang.Object generate(Object[] references) {$
      /* 002 */ return new SpecificUnsafeProjection(references);$
      /* 003 */ }$
      /* 004 */$
      /* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {$
      /* 006 */$
      /* 007 */ private Object[] references;$
      /* 008 */ private org.apache.spark.util.random.XORShiftRandom rng;$
      /* 009 */ private org.apache.spark.util.random.XORShiftRandom rng1;$
      /* 010 */ private org.apache.spark.util.random.XORShiftRandom rng2;$
      /* 011 */ private org.apache.spark.util.random.XORShiftRandom rng3;$
      /* 012 */ private org.apache.spark.util.random.XORShiftRandom rng4;$


      import org.apache.spark.sql.types._
      import org.apache.spark.sql.{Column, DataFrame, SparkSession}
      class RandomData(val numberOfColumns: Int, val numberOfRows: Int) extends Serializable {
        private val minEpoch = Timestamp.valueOf("1800-01-01 00:00:00").getTime
        private val maxEpoch = Timestamp.valueOf("2200-01-01 00:00:00").getTime
        val idColumn = "id"
        import org.apache.spark.sql.functions._
      def generateData(path: String): Unit = {
        val spark: SparkSession = SparkSession.builder().getOrCreate()
        materializeTable(spark).write.parquet(path + "/source")
      private def materializeTable(spark: SparkSession): DataFrame = {
        var sourceDF = spark.sqlContext.range(0, numberOfRows).withColumnRenamed("id", 
        val columns = sourceDF(idColumn) +: (0 until numberOfColumns)
        .flatMap(x => Seq(getTimeColumn(x), getNumberColumn(x), getCategoryColumn(x)))
      sourceDF.select(columns: _*)
      private def getTimeColumn(seed: Int): Column = {
        val uniqueSeed = seed + numberOfColumns * 3
        rand(seed = uniqueSeed)
         .multiply(maxEpoch - minEpoch)
         .plus(minEpoch / 1000).cast(TimestampType).alias(s"time$seed")
      private def getNumberColumn(seed: Int, namePrefix: String = "number"): Column = {
        val uniqueSeed = seed + numberOfColumns * 4
        randn(seed = uniqueSeed).alias(s"$namePrefix$seed")
      private def getCategoryColumn(seed: Int): Column = {
        val uniqueSeed = seed + numberOfColumns * 4
        rand(seed = uniqueSeed).multiply(100).cast("int").alias(s"category$seed")
      object GenerateData{
      def main(args: Array[String]): Unit = {
        new RandomData(args(0).toInt, args(1).toInt).generateData(args(2))

      Please package a jar and run as follows:

      spark-submit --master yarn \
       --driver-memory 12g \
       --executor-memory 12g \
       --deploy-mode cluster \
       --class GenerateData \
       --master yarn \
       100 5000 "hdfs:///tmp/parquet"


