Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28300

Kmeans is failing when we run parallely passing an RDD

    XMLWordPrintableJSON

    Details

    • Type: Task
    • Status: Resolved
    • Priority: Major
    • Resolution: Invalid
    • Affects Version/s: 2.2.0
    • Fix Version/s: None
    • Component/s: ML
    • Labels:
      None

      Description

      Hi,

      I am facing when we run the spark KMEAN algorithm parallelising it by sending sample RDD .

      KMEAN algorithm run is failing on executor when we pass cluster sample as RDD type (

      RDD[linalg.Vector] to executors. It is failing  because RDD[linalg.Vector]  unavailable at executor side.

      Can we pass RDD  to executor to make KMEAN run in parallely ?

      Please suggest any suggestion how to achieve KMEAN running parrelly on executors?

      Please find  below code snippet and error in the logs

      Regards,

      Raman.

      Code snippet

      Driver side  code ::

      val kmeansCluster = sc.parallelize(List.range(kStart, kEnd + 1)).map(k => {
      val sharedContext = SharedClusteringData[linalg.Vector,KMeansModel](job, spark, sampleId, Some(k),
      ClusteringType.KMEANS.name() + "clustering processes for:" + k)
      //val sharedContextLoadSamplesCount = sharedContextLoadSample.clusterSample.get
      //log.info(s"cluster sample count is ${sharedContextLoadSamplesCount.count()}")
      sharedContext.selectedFeatureIdx = Some(loadSample.value.selectedFeatureIdx.get)
      sharedContext.dropColIdx = Some(loadSample.value.dropColIdx.get)
      sharedContext.dataset = loadSample.value.dataset)
      sharedContext.clusterSample= loadSample.value.clusterSample
      println("In Driver program :::")
      sharedContext.clusterSample.foreach(x=>println)
      println("In Driver program END :::")
      RunClustering.runKMean(sharedContext) match {
      case Success(true) =>
      log.info(s"${ClusteringType.KMEANS.name()} is completed for k =$k ")
      case Success(false) =>
      log.error(s"${ClusteringType.KMEANS.name()} is failed for k = $k")
      case Failure(ex) =>
      log.error(s"${ClusteringType.KMEANS.name} clustering failed for $k")
      log.error(ex.getStackTrace.mkString("\n"))
      }
      (k, sharedContext.isSuccessful, sharedContext.message)
      })

      Executor side 

       def buildCluster[S, M](k: Int, clusterSample: RDD[S], maxIteration: Int): Try[M] =

      { Try(KMeans.train(kmeanSample, k, maxIteration).asInstanceOf[M]) }

      Logs::

       

      org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) org.apache.spark.rdd.RDD.count(RDD.scala:1158) com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33) com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11) scala.util.Try$.apply(Try.scala:192) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11) com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81) com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69) scala.collection.Iterator$$anon$11.next(Iterator.scala:409) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) scala.collection.AbstractIterator.to(Iterator.scala:1336) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) scala.collection.AbstractIterator.toArray(Iterator.scala:1336) org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) org.apache.spark.scheduler.Task.run(Task.scala:108) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) 2019-07-05 12:10:24,862 ERROR [Executor task launch worker for task 449] clusteringprocessor.ClusterProcessor: org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) org.apache.spark.rdd.RDD.count(RDD.scala:1158) com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33) com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11) scala.util.Try$.apply(Try.scala:192) com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11) com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81) com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69) scala.collection.Iterator$$anon$11.next(Iterator.scala:409) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) scala.collection.AbstractIterator.to(Iterator.scala:1336) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) scala.collection.AbstractIterator.toArray(Iterator.scala:1336) org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) org.apache.spark.scheduler.Task.run(Task.scala:108) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              itsram08 Ramanjaneya Naidu Nalla
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: