Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38328

SQLConf.get flaky causes NON-default spark session settings being lost

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.2, 3.2.1
    • None
    • SQL

    Description

      We have been using two different setups and the bug is present in both:
      1 Spark 3.1.2

      • spark on k8s mode
      • delta 1.0.0 (for spark 3.1.2)
      • spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)

      2 Spark 3.2.1

      • spark on k8s mode
      • delta 1.1.0 (for spark 3.2.1)
      • spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)

      It happens that our delta merge calls randomly fail with the error "java.lang.RuntimeException: java.time.Instant is not a valid external type for schema of timestamp",
      as if the support for java.time.Instant were not enabled despite the specific spark.sql.datetime.java8API.enabled=true spark session setting.

      Sometimes, simply retrying the failed calls (with no change) solves the failures, but the frequency of the error is variable and so is the number of required retries.

      The error stack points to spark libraries (not delta library, not our code), namely catalyst:
      ```
      2022-02-03 11:22:53,695 WARN scheduler.TaskSetManager: Lost task 11.0 in stage 23.0 (TID 1542) (10.251.129.64 executor 1): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.time.Instant is not a valid external type for schema of timestamp
      [...similar rows removed...]
      if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, SRC_VALID_FROM_DTTM), TimestampType), true, false) AS SRC_VALID_FROM_DTTM#1749
      [...similar rows removed...]
      at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:213)
      at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:195)
      at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
      at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
      at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
      at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
      at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
      at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)
      at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      at org.apache.spark.scheduler.Task.run(Task.scala:131)
      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.RuntimeException: java.time.Instant is not a valid external type for schema of timestamp
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_1$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
      at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:209)
      ... 19 more
      ```

      Tracking the error in spark sources lead us to a piece of code which calls SQLConf.get.datetimeJava8ApiEnabled to select the proper datatype.

      [objects.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala)
      ```
      private lazy val errMsg = s" is not a valid external type for schema of ${expected.simpleString}"
      ```

      [RowEncoder.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala)
      ```
      def externalDataTypeFor(dt: DataType): DataType = dt match {
      case _ if ScalaReflection.isNativeType(dt) => dt
      case TimestampType =>
      if (SQLConf.get.datetimeJava8ApiEnabled)

      { ObjectType(classOf[java.time.Instant]) }

      else

      { ObjectType(classOf[java.sql.Timestamp]) }

      case DateType =>
      if (SQLConf.get.datetimeJava8ApiEnabled)

      { ObjectType(classOf[java.time.LocalDate]) }

      else

      { ObjectType(classOf[java.sql.Date]) }

      ```

      The result of SQLConf.get depends on the existence of an active SparkSession, and this leads to non-default spark session settings sometimes being lost by executors, depending on when they are started.

      [SQLConf.get](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L149)
      ```

      • Returns the active config object within the current scope. If there is an active SparkSession,
      • the proper SQLConf associated with the thread's active session is used. If it's called from
      • tasks in the executor side, a SQLConf will be created from job local properties, which are set
      • and propagated from the driver side, unless a `SQLConf` has been set in the scope by
      • `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created
      • from DataFrames.
        ```

      We inferred that `SQLConf.get` (getConf) could be flaky, and this would be the reason for our random error above.

      In fact, using a custom spark where the above `if` on SQLConf.get.datetimeJava8ApiEnabled is short-circuited for Instant or, better, changing the default in SQLConf.scala solves the issue:

      [original SQLConf.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2613)

      our change:
      ```
      val DATETIME_JAVA8API_ENABLED = buildConf("spark.sql.datetime.java8API.enabled")
      .doc("If the configuration property is set to true, java.time.Instant and " +
      "java.time.LocalDate classes of Java 8 API are used as external types for " +
      "Catalyst's TimestampType and DateType. If it is set to false, java.sql.Timestamp " +
      "and java.sql.Date are used for the same purpose.")
      .version("3.0.0")
      .booleanConf
      .createWithDefault(true) // our test change
      // .createWithDefault(false)
      ```

      Browsing spark jira, we found that there are a number of issues (some open, some closed) all pointing to similar problems in SQLConf.get.

      • SPARK-23894(https://issues.apache.org/jira/browse/SPARK-23894) Flaky Test: BucketedWriteWithoutHiveSupportSuite
        ```
        So how come sometimes its defined? Note that activeSession is an Inheritable thread local. Normally the executor threads are created before activeSession is defined, so they don't inherit anything. But a threadpool is free to create more threads at any time. And when they do, then suddenly the new executor threads will inherit the active session from their parent, a thread in the driver with the activeSession defined.
        ```
      • SPARK-22938(https://issues.apache.org/jira/browse/SPARK-22938) Assert that SQLConf.get is accessed only on the driver.
        ```
        Assert if code tries to access SQLConf.get on executor.
        This can lead to hard to detect bugs, where the executor will read fallbackConf, falling back to default config values, ignoring potentially changed non-default configs.
        If a config is to be passed to executor code, it needs to be read on the driver, and passed explicitly.
        ```
      • SPARK-30798(https://issues.apache.org/jira/browse/SPARK-30798) Scope Session.active in QueryExecution
        ```
        SparkSession.active is a thread local variable that points to the current thread's spark session. It is important to note that the SQLConf.get method depends on SparkSession.active. In the current implementation it is possible that SparkSession.active points to a different session which causes various problems. Most of these problems arise because part of the query processing is done using the configurations of a different session.
        ```
      • SPARK-30223(https://issues.apache.org/jira/browse/SPARK-30223) queries in thrift server may read wrong SQL configs
        ```
        The Spark thrift server creates many SparkSessions to serve requests, and the thrift server serves requests using a single thread. One thread can only have one active SparkSession, so SQLCong.get can't get the proper conf from the session that runs the query.
        ```
      • SPARK-35252(https://issues.apache.org/jira/browse/SPARK-35252) PartitionReaderFactory's Implemention Class of DataSourceV2: sqlConf parameter is null
        ```
        The driver construct a RDD instance(DataSourceRDD), the sqlConf parameter pass to the MyPartitionReaderFactory is not null.But when the executor deserialize the RDD, the sqlConf parameter is null.
        ```
      • SPARK-35324(https://issues.apache.org/jira/browse/SPARK-35324) Spark SQL configs not respected in RDDs

      I think the difference might have to do with the fact that in the RDD case the config isn't in the local properties of the TaskContext.

      * Stepping through the debugger, I see that both RDD and Dataset decide on using or not using the legacy date formatter in DateFormatter.getFormatter.
      * Then in SQLConf.get, both cases find a TaskContext and no existingConf. So they create a new ReadOnlySQLConf from the TaskContext object.
      * RDD and Dataset code path differ in the local properties they find on the TaskContext here. The Dataset code path has spark.sql.legacy.timeParserPolicy in the local properties, but the RDD path doesn't. The ReadOnlySQLConf is created from the local properties, so in the RDD path the resulting config object doesn't have an override for spark.sql.legacy.timeParserPolicy.
       
       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              andrea0pica Andrea Picasso Ratto
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: