Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-45860

ClassCastException with SerializedLambda in Spark Cluster Mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.2.1, 3.4.1
    • None
    • Spark Core, Spark Submit
    • None
    • Environment
      Java Version: 11
      Spring Boot Version: 2.7.10
      Spark Version: 3.2.1

    Description

      Issue Description

      Running a Spark application in cluster mode encounters a `java.lang.ClassCastException` related to `java.lang.invoke.SerializedLambda`. This issue seems to be specific to the Spark Cluster mode, and it doesn't occur when running the application locally without Spring Boot.

       

      Steps to Reproduce

      1. Create a dummy dataset
        Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi", "Andrii", "Rick", "Duc"), Encoders.STRING()); 
      1. Call flatMap function to transform the data
        Dataset<TestData> transformedData = dummyData.flatMap(new TestDataFlatMap(), Encoders.bean(TestData.class)); 
      1. Call any action on the transformed dataset
        transformedData.show(); 
      1. Running this Spark application with spark submit command in cluster mode with Spring Boot results in the mentioned ClassCastException.

       

      Complete Code:

       

      @SpringBootApplication(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
      public class SampleSparkJob{
          public static void main(String[] args) {
              SpringApplication.run(DataIngestionServiceApplication.class, args);
      
              SparkSession spark = SparkSession.builder()
                      .appName("SampleSparkJob")
                      .master("local[*]")
                      .getOrCreate();
              Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi", "Andrii", "Rick", "Duc"), Encoders.STRING());
              Dataset<TestData> transformedData = dummyData.flatMap(new TestDataFlatMap(), Encoders.bean(TestData.class));
              transformedData.show();
              transformedData.write().mode("append").parquet("outputpath");
              spark.stop();
          }
      }
      class TestDataFlatMap implements FlatMapFunction<String, TestData>, Serializable {
          @Override
          public Iterator<TestData> call(String name) {
              return Arrays.asList(new TestData(name)).iterator();
          }
      }
      @Data
      @AllArgsConstructor
      public class TestData implements Serializable {
          private String name;
      } 

       

      Stack trace:

      WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.248.66.38 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)	at java.base/java.lang.reflect.Method.invoke(Method.java:566)	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)	at java.base/java.lang.reflect.Method.invoke(Method.java:566)	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)	at java.base/java.lang.reflect.Method.invoke(Method.java:566)	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)	at org.apache.spark.scheduler.Task.run(Task.scala:131)	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)	at java.base/java.lang.Thread.run(Thread.java:829) 

       

      Environment
      Java Version: 11
      Spring Boot Version: 2.7.10
      Spark Version: 3.2.1

      Additional Information:

      The issue seems to be related to Spring Boot auto-configuration or the dependencies included with Spring Boot.

      Attachments

        Activity

          People

            Unassigned Unassigned
            punditavi@gmail.com Abhilash
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: