Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26558

java.util.NoSuchElementException while saving data into HDFS using Spark

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.0.0
    • None
    • Spark Core, Spark Submit
    • None

    Description

      How to fix java.util.NoSuchElementException while saving data into HDFS using Spark ?

       

      I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader.

      Below are the versions of Spark & Scala I am using:

      spark-core: 2.0.0
      spark-sql: 2.0.0
      Scala version: 2.11.8

      To do that, I wrote the following code:

       

      val conf = new SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", "1200s") .set("spark.network.timeout", "12000s") .set("spark.sql.inMemoryColumnarStorage.compressed", "true") .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress", "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName) .set("spark.streaming.stopGracefullyOnShutdown", "true") .set("spark.yarn.driver.memoryOverhead", "8192") .set("spark.yarn.executor.memoryOverhead", "8192") .set("spark.sql.shuffle.partitions", "400") .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "true") .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances", "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4") .set("spark.files.maxPartitionBytes", "268435468") 
      val flagCol = "del_flag" val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate() import spark.implicits._ 
      val dtypes = spark.read.format("jdbc").option("url", hiveMetaConURL).option("dbtable", "(select source_type, hive_type from hivemeta.types) as gpHiveDataTypes").option("user", metaUserName).option("password", metaPassword).load() 
      val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL) .option("dbtable", "(select source_columns, precision_columns, partition_columns from hivemeta.source_table where tablename='gpschema.empdocs') as colsPrecision") .option("user", metaUserName).option("password", metaPassword).load() 
      val dataMapper = dtypes.as[(String, String)].collect().toMap 
      val gpCols = spColsDF.select("source_columns").map(row => row.getString(0)).collect.mkString(",") 
      val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList 
      val precisionCols = spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",") val partition_columns = spColsDF.select("partition_columns").collect.flatMap(x => x.getAs[String](0).split(",")) 
      val prtn_String_columns = spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",") val partCList = prtn_String_columns.split(",").map(x => col(x)) 
      var splitPrecisionCols = precisionCols.split(",") for (i <- splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as ").concat(s"${i}_text") textList += s"${i}_text:${textType}" } 
      val pCols = precisionColsText.mkString(",") 
      val allColumns = gpColumns.concat("," + pCols) 
      val allColumnsSeq = allColumns.split(",").toSeq 
      val allColumnsSeqC = allColumnsSeq.map(x => column(x)) 
      val gpColSeq = gpColumns.split(",").toSeq 
      def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], allColumns: String, dataMapper: Map[String, String], partition_columns: Array[String], spark: SparkSession): DataFrame = { 
      val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema") .option("user", devUserName).option("password", devPassword) .option("partitionColumn","header_id") .load() .where("year=2017 and month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0)) 
      val totalCols: List[String] = splitColumns ++ textList 
      val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname)) 
      val resultDF = yearDF.select(allCols: _*) 
      val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name) 
      val finalDF = stringColumns.foldLeft(resultDF) { (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+", " ")) } finalDF } 
      val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark) 
      dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/") } }

       

      When I submit the job, I see the tasks at below lines complete:

       
      val dataMapper = dtypes.as[(String, String)].collect().toMap 
      val gpCols = spColsDF.select("source_columns").map(row => row.getString(0)).collect.mkString(",") 
      val precisionCols = spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",") val partition_columns = spColsDF.select("partition_columns").collect.flatMap(x => x.getAs[String](0).split(",")) 
      val prtn_String_columns = spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
       
      

       

      Once the task of saving the prepared dataframe starts, which is:

      dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/")

      job ends with the exception: {{}}

      java.util.NoSuchElementException

      I am submitting the job using below spark-submit command:

      SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.YearPartition --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --conf spark.jars=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/hdpdevusr/hdpdevusr.keytab --principal hdpdevusr@usrdev.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar splinter_2.11-0.1.jar

      I see the command launches the executors as per the specified numbers in the code which is 12 executors with 4 cores each.

      Only 5 out of 48 tasks will complete and the job ends with the exception:

      [Stage 5:> (0 + 48) / 64]18/12/27 10:29:10 WARN TaskSetManager: Lost task 6.0 in stage 5.0 (TID 11, executor 11): java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:43) at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:110) at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:109) at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Job 5 cancelled because killed via the Web UI at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1446) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1439) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1701) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186) ... 44 more 18/12/27 10:30:53 WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout, java.util.concurrent.TimeoutException java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67) 18/12/27 10:30:53 ERROR Utils: Uncaught exception in thread pool-6-thread-1 java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1249) at java.lang.Thread.join(Thread.java:1323) at org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:199) at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1919) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317) at org.apache.spark.SparkContext.stop(SparkContext.scala:1918) at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1948) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

       

      I don't understand where did it go wrong whether in code or in any configuration applied in the job.

      I posted the same on Stackoverflow as well. For executor images, the below link can be referred:[
      https://stackoverflow.com/questions/54002002/how-to-fix-java-util-nosuchelementexception-while-saving-data-into-hdfs-using-sp/54002423?noredirect=1#comment94843141_54002423|http://example.com]

      Could anyone let me know how to fix this exception ?

       

      Attachments

        1. OKVMg.png
          148 kB
          Sidhartha
        2. k5EWv.png
          529 kB
          Sidhartha

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            bobbysidhartha Sidhartha
            pankaj arora pankaj arora
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment