Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28062

HuberAggregator copies coefficients vector every time an instance is added

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.0
    • Component/s: ML
    • Labels:
      None

      Description

      Every time an instance is added to the HuberAggregator, a copy of the coefficients vector is created (see code snippet below). This causes a performance degradation, which is particularly severe when the instances have long sparse feature vectors.

      def add(instance: Instance): HuberAggregator = {
          instance match { case Instance(label, weight, features) =>
            require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." +
              s" Expecting $numFeatures but got ${features.size}.")
            require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
      
            if (weight == 0.0) return this
            val localFeaturesStd = bcFeaturesStd.value
            val localCoefficients = bcParameters.value.toArray.slice(0, numFeatures)
      val localGradientSumArray = gradientSumArray
      
      // Snip
      
      }
      

      The LeastSquaresAggregator class avoids this performance issue via the use of transient lazy class variables to store such reused values. Applying a similar approach to HuberAggregator gives a significant speed boost. Running the script below locally on my machine gives the following timing results:

      Current implementation: 
          Time(s): 540.1439919471741
          Iterations: 26
          Intercept: 0.518109382890512
          Coefficients: [0.0, -0.2516936902000245, 0.0, 0.0, -0.19633887469839809, 0.0, -0.39565545053893925, 0.0, -0.18617574426698882, 0.0478922416670529]
      
      Modified implementation to match LeastSquaresAggregator:
          Time(s): 46.82946586608887
          Iterations: 26
          Intercept: 0.5181093828893774
          Coefficients: [0.0, -0.25169369020031357, 0.0, 0.0, -0.1963388746927919, 0.0, -0.3956554505389966, 0.0, -0.18617574426702874, 0.04789224166878518]
      
      from random import random, randint, seed
      import time
      
      from pyspark.ml.feature import OneHotEncoder
      from pyspark.ml.regression import LinearRegression
      from pyspark.sql import SparkSession
      
      seed(0)
      
      spark = SparkSession.builder.appName('huber-speed-test').getOrCreate()
      df = spark.createDataFrame([[randint(0, 100000), random()] for i in range(100000)],  ["category", "target"])
      ohe = OneHotEncoder(inputCols=["category"], outputCols=["encoded_category"]).fit(df)
      lr = LinearRegression(featuresCol="encoded_category", labelCol="target", loss="huber", regParam=1.0)
      
      start = time.time()
      model = lr.fit(ohe.transform(df))
      end = time.time()
      
      print("Time(s): " + str(end - start))
      print("Iterations: " + str(model.summary.totalIterations))
      print("Intercept: " + str(model.intercept))
      print("Coefficients: " + str(list(model.coefficients)[0:10]))
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Andrew-C Andrew Crosby
                Reporter:
                Andrew-C Andrew Crosby
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: