Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22403

StructuredKafkaWordCount example fails in YARN cluster mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0
    • 2.2.1, 2.3.0
    • Structured Streaming
    • None

    Description

      When I run the StructuredKafkaWordCount example in YARN client mode, it runs fine. However, when I run it in YARN cluster mode, the application errors during initialization, and dies after the default number of YARN application attempts. In the AM log, I see

      17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value AS STRING)
      17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_000001/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata
      org.apache.hadoop.security.AccessControlException: Permission denied: user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
      	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397)
      	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
      	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
      	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842)
      	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826)
      	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785)
      	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257)
      ...
              at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960)
      	at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:114)
      	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
      	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
      	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
      	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79)
      	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala)
      

      Looking at StreamingQueryManager#createQuery, we have
      https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198

          val checkpointLocation = userSpecifiedCheckpointLocation.map { ...
            ...
          }.orElse {
            ...
          }.getOrElse {
            if (useTempCheckpointLocation) {
              // Delete the temp checkpoint when a query is being stopped without errors.
              deleteCheckpointOnStop = true
              Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
            } else {
              ...
            }
          }
      

      And Utils.createTempDir has

        def createTempDir(
            root: String = System.getProperty("java.io.tmpdir"),
            namePrefix: String = "spark"): File = {
          val dir = createDirectory(root, namePrefix)
          ShutdownHookManager.registerShutdownDeleteDir(dir)
          dir
        }
      

      In client mode, java.io.tmpdir is set to "/tmp", which also exists in HDFS and has permissions 1777. In cluster mode, java.io.tmpdir is set in the YARN AM to "$PWD/tmp", where PWD is "${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}".
      The problem is that Spark is using java.io.tmpdir, which is a path in the local filesystem, as a path in HDFS. When that path is "/tmp", which happens to exist in HDFS, no problem arises, but that is just by coincidence.

      Attachments

        Activity

          People

            wypoon Wing Yew Poon
            wypoon Wing Yew Poon
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: