Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35252

PartitionReaderFactory's Implemention Class of DataSourceV2: sqlConf parameter is null

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.2, 3.1.1
    • None
    • SQL
    • None

    Description

      The codes of "MyPartitionReaderFactory" :

      // Implemention Class
      package com.lynn.spark.sql.v2
      
      import org.apache.spark.internal.Logging
      import org.apache.spark.sql.catalyst.InternalRow
      import com.lynn.spark.sql.v2.MyPartitionReaderFactory.{MY_VECTORIZED_READER_BATCH_SIZE, MY_VECTORIZED_READER_ENABLED}
      import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
      import org.apache.spark.sql.internal.SQLConf
      import org.apache.spark.sql.types.StructType
      import org.apache.spark.sql.vectorized.ColumnarBatch
      import org.apache.spark.sql.internal.SQLConf.buildConf
      
      case class MyPartitionReaderFactory(sqlConf: SQLConf,
                                          dataSchema: StructType,
                                          readSchema: StructType)
        extends PartitionReaderFactory with Logging {
      
        val enableVectorized = sqlConf.getConf(MY_VECTORIZED_READER_ENABLED, false)
        val batchSize = sqlConf.getConf(MY_VECTORIZED_READER_BATCH_SIZE, 4096)
      
        override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
          MyRowReader(batchSize, dataSchema, readSchema)
        }
      
        override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = {
          if(!supportColumnarReads(partition))
            throw new UnsupportedOperationException("Cannot create columnar reader.")
      
             MyColumnReader(batchSize, dataSchema, readSchema)
      
        }
      
        override def supportColumnarReads(partition: InputPartition) = enableVectorized
      
      }
      
      object MyPartitionReaderFactory {
      
        val MY_VECTORIZED_READER_ENABLED =
          buildConf("spark.sql.my.enableVectorizedReader")
            .doc("Enables vectorized my source scan.")
            .version("1.0.0")
            .booleanConf
            .createWithDefault(false)
      
        val MY_VECTORIZED_READER_BATCH_SIZE =
          buildConf("spark.sql.my.columnarReaderBatchSize")
            .doc("The number of rows to include in a my source vectorized reader batch. The number should " +
              "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
            .version("1.0.0")
            .intConf
            .createWithDefault(4096)
      }
      

      The driver construct a RDD instance(DataSourceRDD), the sqlConf parameter pass to the MyPartitionReaderFactory is not null.
      But when the executor deserialize the RDD, the sqlConf parameter is null.

      The codes as follows:

      // RunTask.scala
      override def runTask(context: TaskContext): U = {
          // Deserialize the RDD and the func using the broadcast variables.
          val threadMXBean = ManagementFactory.getThreadMXBean
          val deserializeStartTimeNs = System.nanoTime()
          val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
            threadMXBean.getCurrentThreadCpuTime
          } else 0L
          val ser = SparkEnv.get.closureSerializer.newInstance()
         //  the rdd 
          val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
            ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
          _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
          _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
            threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
          } else 0L
      
          func(context, rdd.iterator(partition, context))
        }
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              lynnyuan lynn
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: