Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-36411

Unable to Write non-Delta Formats to Azure Data Lake Gen2 Due to Owner of the Parent Folder's Permissions

    XMLWordPrintableJSON

Details

    Description

      When writing a Spark DataFrame to Azure Data Lake Gen2 storage using any format other than "Delta", a folder is created on the filesystem as expected but the permissions of the owner of that folder are the same as the permissions of the parent folder owner. If the parent folder owner does not have any access permissions, the write command will create the folder on the filesystem with the same permissions of the parent folder owner for our new owner. This is an expected behavior as per engineering team of ADLS Gen2.

      The write will fail with the following error:

      Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) (10.10.10.10 executor 0): Operation failed: "This request is not authorized to perform this operation using this permission.", 403, PUT, https://storageName.dfs.core.windows.net/path/to/folder/_started_2238238532712736832?resource=file&timeout=90, AuthorizationPermissionMismatch, "This request is not authorized to perform this operation using this permission. RequestId:xxxx Time:2021-08-03T08:35:26.1753745Z" at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:237) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsClient.createPath(AbfsClient.java:311) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:501) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:208) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789) at com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol.newTaskTempFileAbsPath(DirectoryAtomicCommitProtocol.scala:123) at com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol.newTaskTempFile(DirectoryAtomicCommitProtocol.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:121) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:327) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:266) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:91) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:789) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)

      Suggested solution:

      Instead of letting the storage create the folder implicitly when writing the files, if we create the folder on the storage first using fs.mkdirs() (same as we do for Delta to create the delta log folder), this should fix the issue:

      Delta writer:

      // first create _delta_log directory

      import org.apache.hadoop.fs.Path

      val logPath = new Path(deltaPath, "_delta_log")
      val fs = logPath.getFileSystem(spark.sessionState.newHadoopConf)
      fs.mkdirs(logPath)

      Parquet writer:

      val finalPath = new Path(parquetPath, fileName)
      val fs = finalPath.getFileSystem(spark.sessionState.newHadoopConf)
      val txnId = math.abs(scala.util.Random.nextLong).toString
      val startMarker = new Path(finalPath.getParent, new Path(s"started$txnId"))
      fs.create(startMarker, false).close()

      Instead for the Parquet/CSV/JSON ... etc. writers, we can add an "fs.mkdirs(parquetPath)" to create the folder as a first step before writing the files when it is writing to ADLS Gen1 and ADLS Gen2.

      Attachments

        Activity

          People

            Unassigned Unassigned
            ossuleim Osama Suleiman
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: