Details
-
Task
-
Status: Resolved
-
Major
-
Resolution: Invalid
-
2.2.0
-
None
-
None
Description
Hi,
I am facing when we run the spark KMEAN algorithm parallelising it by sending sample RDD .
KMEAN algorithm run is failing on executor when we pass cluster sample as RDD type (
RDD[linalg.Vector] to executors. It is failing because RDD[linalg.Vector] unavailable at executor side.
Can we pass RDD to executor to make KMEAN run in parallely ?
Please suggest any suggestion how to achieve KMEAN running parrelly on executors?
Please find below code snippet and error in the logs
Regards,
Raman.
Code snippet
Driver side code ::
val kmeansCluster = sc.parallelize(List.range(kStart, kEnd + 1)).map(k => {
val sharedContext = SharedClusteringData[linalg.Vector,KMeansModel](job, spark, sampleId, Some(k),
ClusteringType.KMEANS.name() + "clustering processes for:" + k)
//val sharedContextLoadSamplesCount = sharedContextLoadSample.clusterSample.get
//log.info(s"cluster sample count is ${sharedContextLoadSamplesCount.count()}")
sharedContext.selectedFeatureIdx = Some(loadSample.value.selectedFeatureIdx.get)
sharedContext.dropColIdx = Some(loadSample.value.dropColIdx.get)
sharedContext.dataset = loadSample.value.dataset)
sharedContext.clusterSample= loadSample.value.clusterSample
println("In Driver program :::")
sharedContext.clusterSample.foreach(x=>println)
println("In Driver program END :::")
RunClustering.runKMean(sharedContext) match {
case Success(true) =>
log.info(s"${ClusteringType.KMEANS.name()} is completed for k =$k ")
case Success(false) =>
log.error(s"${ClusteringType.KMEANS.name()} is failed for k = $k")
case Failure(ex) =>
log.error(s"${ClusteringType.KMEANS.name} clustering failed for $k")
log.error(ex.getStackTrace.mkString("\n"))
}
(k, sharedContext.isSuccessful, sharedContext.message)
})
Executor side
def buildCluster[S, M](k: Int, clusterSample: RDD[S], maxIteration: Int): Try[M] =
{ Try(KMeans.train(kmeanSample, k, maxIteration).asInstanceOf[M]) }Logs::
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) org.apache.spark.rdd.RDD.count(RDD.scala:1158) com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33) com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11) scala.util.Try$.apply(Try.scala:192) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11) com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81) com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69) scala.collection.Iterator$$anon$11.next(Iterator.scala:409) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) scala.collection.AbstractIterator.to(Iterator.scala:1336) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) scala.collection.AbstractIterator.toArray(Iterator.scala:1336) org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) org.apache.spark.scheduler.Task.run(Task.scala:108) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) 2019-07-05 12:10:24,862 ERROR [Executor task launch worker for task 449] clusteringprocessor.ClusterProcessor: org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) org.apache.spark.rdd.RDD.count(RDD.scala:1158) com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33) com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11) scala.util.Try$.apply(Try.scala:192) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11) com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81) com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69) scala.collection.Iterator$$anon$11.next(Iterator.scala:409) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) scala.collection.AbstractIterator.to(Iterator.scala:1336) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) scala.collection.AbstractIterator.toArray(Iterator.scala:1336) org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) org.apache.spark.scheduler.Task.run(Task.scala:108) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)