Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3693

Cached Hadoop RDD always return rows with the same value

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.2.0
    • None
    • Spark Core
    • None

    Description

      While trying RDD caching, it's found that caching a Hadoop RDD causes data correctness issues. The following code snippet demonstrates the usage:

      public final class Test {
          public static void main(String[] args) throws Exception {
              SparkConf sparkConf = new SparkConf().setAppName("Test");
              JavaSparkContext ctx = new JavaSparkContext(sparkConf);
              ... 
              JavaPairRDD<BytesWritable, BytesWritable> input = 
                      ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, WritableComparable.class, Writable.class);
              input = input.cache();
              input.foreach(new VoidFunction<Tuple2<BytesWritable, BytesWritable>>() {
                  @Override
                  public void call(Tuple2<BytesWritable, BytesWritable> row) throws Exception {
                      if (row._1() != null) {
                          System.out.println("Key: " + row._1());
                      }
                      if (row._2() != null) {
                          System.out.println("Value: " + row._2());
                      }
                  }
              });
              ctx.stop();
          }
      }
      

      In this case, row._2() always gives the same value. If we disable caching by removing input.cache(), the program gives the expected rows.

      Further analysis shows that MemoryStore (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236) is storing the references to (key, value) pairs returned by HadoopRDD.getNext() (See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220), but this method always returns the same (key, value) object references, except each getNext() call updates values inside these objects. When there are no more records (key, value) objects are filled with empty strings (no values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same key, value object pairs, all values become NULL.

      Probably MemoryStore should instead store a copy of <key, value> pair rather than keeping a reference to it.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              xuefuz Xuefu Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: