Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27842

Inconsistent results of Statistics.corr() and PearsonCorrelation.computeCorrelationMatrix()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.1, 2.4.3
    • None
    • ML, Spark Core, Windows

    Description

      Hi,

      I am working with Spark Java API in local mode (1 node, 8 cores). Spark version as follows in my pom.xml:

      MLLib

      <artifactId>spark-mllib_2.11</artifactId>
      <version>2.3.1</version>

      Core

      <artifactId>spark-core_2.11</artifactId>
      <version>2.3.1</version>

      I am experiencing inconsistent results of correlation when starting my Spark application with 8 cores vs 1/2/3 cores.

      I've created a Main class which reads from a file a list of Vectors; 240 Vector which each Vector is of the length of 226. 

      As you can see, I am firstly initializing Spark with local[*], running Correlation, saving the Matrix and stopping Spark. Then, I do the same but for local[3].

      I am expecting to get the same matrices on both runs. But this is not the case. The input file is attached.

      I tried to compute the correlation using PearsonCorrealtion.computeCorrelationMatrix() but I faced the same issue here as well.

       

      In my work, I am dependent on the resulting correlation matrix. Thus, I am experiencing bad results in y application due to the inconsistent results I am getting. As a workaround, I am working now with local[1]

       

       

      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.mllib.linalg.DenseVector;
      import org.apache.spark.mllib.linalg.Matrix;
      import org.apache.spark.mllib.linalg.Vector;
      import org.apache.spark.mllib.stat.Statistics;
      import org.apache.spark.rdd.RDD;
      import org.junit.Assert;
      
      import java.io.BufferedReader;
      import java.io.FileReader;
      import java.io.IOException;
      import java.math.RoundingMode;
      import java.text.DecimalFormat;
      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.List;
      import java.util.stream.Collectors;
      
      public class TestSparkCorr {
      
      private static JavaSparkContext ctx;
      
      public static void main(String[] args) {
      
      List<List<Double>> doublesLists = readInputFile();
      List<Vector> resultVectors = getVectorsList(doublesLists);
      //===========================================================================
      initSpark("*");
      RDD<Vector> RDD_vector = ctx.parallelize(resultVectors).rdd();
      Matrix m = Statistics.corr(RDD_vector, "pearson");
      stopSpark();
      //===========================================================================
      initSpark("3");
      RDD<Vector> RDD_vector_3 = ctx.parallelize(resultVectors).rdd();
      Matrix m3 = Statistics.corr(RDD_vector_3, "pearson");
      stopSpark();
      //===========================================================================
      Assert.assertEquals(m3, m);
      
      }
      
      private static List<Vector> getVectorsList(List<List<Double>> doublesLists) {
      List<Vector> resultVectors = new ArrayList<>(doublesLists.size());
      for (List<Double> vector : doublesLists) {
      double[] x = new double[vector.size()];
      for(int i = 0; i < x.length; i++){
      x[i] = vector.get(i);
      }
      resultVectors.add(new DenseVector(x));
      }
      return resultVectors;
      }
      
      private static List<List<Double>> readInputFile() {
      List<List<Double>> doublesLists = new ArrayList<>();
      try (BufferedReader reader = new BufferedReader(new FileReader(
      ".//output//vectorList.txt"))) {
      String line = reader.readLine();
      while (line != null) {
      String[] splitLine = line.substring(1, line.length() - 2).split(",");
      List<Double> doubles = Arrays.stream(splitLine).map(x -> Double.valueOf(x.trim())).collect(Collectors.toList());
      doublesLists.add(doubles);
      // read next line
      line = reader.readLine();
      }
      } catch (IOException e) {
      e.printStackTrace();
      }
      return doublesLists;
      }
      
      private static void initSpark(String coresNum) {
      final SparkConf sparkConf = new SparkConf().setAppName("Span");
      sparkConf.setMaster(String.format("local[%s]", coresNum));
      sparkConf.set("spark.ui.enabled", "false");
      ctx = new JavaSparkContext(sparkConf);
      }
      
      private static void stopSpark() {
      
      ctx.stop();
      if(ctx.sc().isStopped()){
      ctx = null;
      }
      }
      
      }
      
      

       

       

       

      Attachments

        1. vectorList.txt
          785 kB
          Peter Nijem

        Activity

          People

            Unassigned Unassigned
            peter_presenso Peter Nijem
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: