Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29872

Improper cache strategy in examples

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Won't Fix
    • 3.0.0
    • None
    • Examples
    • None

    Description

      1. Improper cache in examples.SparkTC
      The RDD edges should be cached because it is used multiple times in while loop. And it should be unpersisted before the last action tc.count(), because tc has been persisted.
      On the other hand, many tc objects is cached in while loop but never uncached, which will waste memory.

          val edges = tc.map(x => (x._2, x._1)) // Edges should be cached
          // This join is iterated until a fixed point is reached.
          var oldCount = 0L
          var nextCount = tc.count()
          do { 
            oldCount = nextCount
            // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
            // then project the result to obtain the new (x, z) paths.
            tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache()
            nextCount = tc.count()
          } while (nextCount != oldCount)
          println(s"TC has ${tc.count()} edges.")
      

      2. Cache needed in examples.ml.LogisticRegressionSummary
      The DataFrame fMeasure should be cached.

          // Set the model threshold to maximize F-Measure
          val fMeasure = trainingSummary.fMeasureByThreshold // fMeasures should be cached
          val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
          val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
            .select("threshold").head().getDouble(0)
          lrModel.setThreshold(bestThreshold)
      

      3. Cache needed in examples.sql.SparkSQLExample

          val peopleDF = spark.sparkContext
            .textFile("examples/src/main/resources/people.txt")
            .map(_.split(","))
            .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) // This RDD should be cahced
            .toDF()
          // Register the DataFrame as a temporary view
          peopleDF.createOrReplaceTempView("people")
          val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
          teenagersDF.map(teenager => "Name: " + teenager(0)).show()
          teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
          implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
          teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
      

      This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              spark_cachecheck IcySanwitch
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: