Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-2579

Reading from S3 returns an inconsistent number of items with Spark 0.9.1

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Cannot Reproduce
    • Affects Version/s: 0.9.1
    • Fix Version/s: None
    • Component/s: Input/Output
    • Labels:

      Description

      I have created a random matrix of 1M rows with 10K items on each row, semicolon-separated. While reading it with Spark 0.9.1 and doing a count, I consistently get less than 1M rows, and a different number every time at that ( !! ). Example below:

      head -n 1 tool-generate-random-matrix*log
      ==> tool-generate-random-matrix-999158.log <==
      Row item counts: 999158

      ==> tool-generate-random-matrix.log <==
      Row item counts: 997163

      The data is split into 1000 partitions. When I download it using s3cmd sync, and run the following AWK on it, I get the correct number of rows in each partition (1000x1000 = 1M). What is up?

      checkrows.sh
      for k in part-0*
      do
        echo $k
        awk -F ";" '
          NF != 10000 {
            print "Wrong number of items:",NF
          }
      
          END {
            if (NR != 1000) {
              print "Wrong number of rows:",NR
            }
          }' "$k"
      done
      

      The matrix generation and counting code is below:

      Matrix.scala
      package fi.helsinki.cs.nodes.matrix
      
      import java.util.Random
      import org.apache.spark._
      import org.apache.spark.SparkContext._
      import scala.collection.mutable.ListBuffer
      import org.apache.spark.rdd.RDD
      import org.apache.spark.storage.StorageLevel._
      
      object GenerateRandomMatrix {
        def NewGeMatrix(rSeed: Int, rdd: RDD[Int], features: Int) = {
          rdd.mapPartitions(part => part.map(xarr => {
              val rdm = new Random(rSeed + xarr)
              val arr = new Array[Double](features)
              for (i <- 0 until features)
                arr(i) = rdm.nextDouble()
              new Row(xarr, arr)
            }))
        }
      
        case class Row(id: Int, elements: Array[Double]) {}
      
        def rowFromText(line: String) = {
          val idarr = line.split(" ")
          val arr = idarr(1).split(";")
          // -1 to fix saved matrix indexing error
          new Row(idarr(0).toInt-1, arr.map(_.toDouble))
        }
      
        def main(args: Array[String]) {
          val master = args(0)
          val tasks = args(1).toInt
          val savePath = args(2)
          val read = args.contains("read")
          
          val datapoints = 1000000
          val features = 10000
      
          val sc = new SparkContext(master, "RandomMatrix")
          if (read) {
            val randomMatrix: RDD[Row] = sc.textFile(savePath, tasks).map(rowFromText).persist(MEMORY_AND_DISK)
            println("Row item counts: "+ randomMatrix.count)
          } else {
            val rdd = sc.parallelize(0 until datapoints, tasks)
            val bcSeed = sc.broadcast(128)
            /* Generating a matrix of random Doubles */
            val randomMatrix = NewGeMatrix(bcSeed.value, rdd, features).persist(MEMORY_AND_DISK)
            randomMatrix.map(row => row.id + " " + row.elements.mkString(";")).saveAsTextFile(savePath)
          }
          
          sc.stop
        }
      }
      

      I run this with:
      appassembler/bin/tool-generate-random-matrix master 1000 s3n://keys@path/to/data 1>matrix.log 2>matrix.err

      Reading from HDFS gives the right count and right number of items on each row. However, I had to run with the full path with the server name, just /matrix does not work (it thinks I want file://):

      p="hdfs://ec2-54-188-6-77.us-west-2.compute.amazonaws.com:9000/matrix"
      appassembler/bin/tool-generate-random-matrix $( cat /root/spark-ec2/cluster-url ) 1000 "$p" read 1>readmatrix.log 2>readmatrix.err

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                lagerspetz Eemil Lagerspetz
              • Votes:
                1 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: