Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
3.1.2
-
None
-
None
-
Azure Databricks 8.4, Spark 3.1.2, https://docs.microsoft.com/en-us/azure/databricks/release-notes/runtime/8.4#system-environment.
Azure Data Lake Gen2 used as the location for the writers.
Description
When writing a Spark DataFrame to Azure Data Lake Gen2 storage using any format other than "Delta", a folder is created on the filesystem as expected but the permissions of the owner of that folder are the same as the permissions of the parent folder owner. If the parent folder owner does not have any access permissions, the write command will create the folder on the filesystem with the same permissions of the parent folder owner for our new owner. This is an expected behavior as per engineering team of ADLS Gen2.
The write will fail with the following error:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) (10.10.10.10 executor 0): Operation failed: "This request is not authorized to perform this operation using this permission.", 403, PUT, https://storageName.dfs.core.windows.net/path/to/folder/_started_2238238532712736832?resource=file&timeout=90, AuthorizationPermissionMismatch, "This request is not authorized to perform this operation using this permission. RequestId:xxxx Time:2021-08-03T08:35:26.1753745Z" at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:237) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsClient.createPath(AbfsClient.java:311) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:501) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:208) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789) at com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol.newTaskTempFileAbsPath(DirectoryAtomicCommitProtocol.scala:123) at com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol.newTaskTempFile(DirectoryAtomicCommitProtocol.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:121) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:327) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:266) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:91) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:789) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
Suggested solution:
Instead of letting the storage create the folder implicitly when writing the files, if we create the folder on the storage first using fs.mkdirs() (same as we do for Delta to create the delta log folder), this should fix the issue:
Delta writer:
// first create _delta_log directory
import org.apache.hadoop.fs.Path
val logPath = new Path(deltaPath, "_delta_log")
val fs = logPath.getFileSystem(spark.sessionState.newHadoopConf)
fs.mkdirs(logPath)
Parquet writer:
val finalPath = new Path(parquetPath, fileName)
val fs = finalPath.getFileSystem(spark.sessionState.newHadoopConf)
val txnId = math.abs(scala.util.Random.nextLong).toString
val startMarker = new Path(finalPath.getParent, new Path(s"started$txnId"))
fs.create(startMarker, false).close()
Instead for the Parquet/CSV/JSON ... etc. writers, we can add an "fs.mkdirs(parquetPath)" to create the folder as a first step before writing the files when it is writing to ADLS Gen1 and ADLS Gen2.