Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.1, 2.4.3
-
None
Description
Hi,
I am working with Spark Java API in local mode (1 node, 8 cores). Spark version as follows in my pom.xml:
MLLib
<artifactId>spark-mllib_2.11</artifactId>
<version>2.3.1</version>
Core
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
I am experiencing inconsistent results of correlation when starting my Spark application with 8 cores vs 1/2/3 cores.
I've created a Main class which reads from a file a list of Vectors; 240 Vector which each Vector is of the length of 226.
As you can see, I am firstly initializing Spark with local[*], running Correlation, saving the Matrix and stopping Spark. Then, I do the same but for local[3].
I am expecting to get the same matrices on both runs. But this is not the case. The input file is attached.
I tried to compute the correlation using PearsonCorrealtion.computeCorrelationMatrix() but I faced the same issue here as well.
In my work, I am dependent on the resulting correlation matrix. Thus, I am experiencing bad results in y application due to the inconsistent results I am getting. As a workaround, I am working now with local[1]
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.DenseVector; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.stat.Statistics; import org.apache.spark.rdd.RDD; import org.junit.Assert; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.math.RoundingMode; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; public class TestSparkCorr { private static JavaSparkContext ctx; public static void main(String[] args) { List<List<Double>> doublesLists = readInputFile(); List<Vector> resultVectors = getVectorsList(doublesLists); //=========================================================================== initSpark("*"); RDD<Vector> RDD_vector = ctx.parallelize(resultVectors).rdd(); Matrix m = Statistics.corr(RDD_vector, "pearson"); stopSpark(); //=========================================================================== initSpark("3"); RDD<Vector> RDD_vector_3 = ctx.parallelize(resultVectors).rdd(); Matrix m3 = Statistics.corr(RDD_vector_3, "pearson"); stopSpark(); //=========================================================================== Assert.assertEquals(m3, m); } private static List<Vector> getVectorsList(List<List<Double>> doublesLists) { List<Vector> resultVectors = new ArrayList<>(doublesLists.size()); for (List<Double> vector : doublesLists) { double[] x = new double[vector.size()]; for(int i = 0; i < x.length; i++){ x[i] = vector.get(i); } resultVectors.add(new DenseVector(x)); } return resultVectors; } private static List<List<Double>> readInputFile() { List<List<Double>> doublesLists = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new FileReader( ".//output//vectorList.txt"))) { String line = reader.readLine(); while (line != null) { String[] splitLine = line.substring(1, line.length() - 2).split(","); List<Double> doubles = Arrays.stream(splitLine).map(x -> Double.valueOf(x.trim())).collect(Collectors.toList()); doublesLists.add(doubles); // read next line line = reader.readLine(); } } catch (IOException e) { e.printStackTrace(); } return doublesLists; } private static void initSpark(String coresNum) { final SparkConf sparkConf = new SparkConf().setAppName("Span"); sparkConf.setMaster(String.format("local[%s]", coresNum)); sparkConf.set("spark.ui.enabled", "false"); ctx = new JavaSparkContext(sparkConf); } private static void stopSpark() { ctx.stop(); if(ctx.sc().isStopped()){ ctx = null; } } }