Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22216 Improving PySpark/Pandas interoperability
  3. SPARK-23011

Support alternative function form with group aggregate pandas UDF

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 2.4.0
    • PySpark
    • None

    Description

      The current semantics of groupby apply is that the output schema of groupby apply is the same as the output schema of the UDF. Because grouping column is usually useful to users, users often need to output grouping columns in the UDF. To further explain, consider the following example:

      import statsmodels.api as sm
      # df has four columns: id, y, x1, x2
      
      group_column = 'id'
      y_column = 'y'
      x_columns = ['x1', 'x2']
      schema = df.select(group_column, *x_columns).schema
      
      @pandas_udf(schema, PandasUDFType.GROUP_MAP)
      # Input/output are both a pandas.DataFrame
      def ols(pdf):
          group_key = pdf[group_column].iloc[0]
          y = pdf[y_column]
          X = pdf[x_columns]
            X = sm.add_constant(X)
          model = sm.OLS(y, X).fit()
      
          return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)
      
      beta = df.groupby(group_column).apply(ols)
      

      Although the UDF (linear regression) has nothing to do with the grouping column, the user needs to deal with grouping column in the UDF. In other words, the UDF is tightly coupled with the grouping column.

       

      With discussion in https://github.com/apache/spark/pull/20211#discussion_r160524679, we reached consensus for supporting an alternative function form:

      def foo(key, pdf):
          key  # this is a grouping key. 
          pdf  # this is the Pandas DataFrame
      
      pudf = pandas_udf(f=foo, returnType="id int, v double", functionType=GROUP_MAP)
      df.groupby(group_column).apply(pudf)
      import statsmodels.api as sm
      # df has four columns: id, y, x1, x2
      
      group_column = 'id'
      y_column = 'y'
      x_columns = ['x1', 'x2']
      schema = df.select(group_column, *x_columns).schema
      
      @pandas_udf(schema, PandasUDFType.GROUP_MAP)
      # Input/output are both a pandas.DataFrame
      def ols(key, pdf):
          y = pdf[y_column]
          X = pdf[x_columns]
          X = sm.add_constant(X)
          model = sm.OLS(y, X).fit()
      
          return pd.DataFrame([key + [model.params[i] for i in x_columns]])
      
      beta = df.groupby(group_column).apply(ols)
      

       

      In summary:

      • Support alternative form f(key, pdf). The current form f(pdf) will still be supported. (Through function inspection)
      • In both cases, the udf output schema will be the final output schema of the spark DataFrame.
      • Key will be passed to user as a python tuple.

      Attachments

        Activity

          People

            icexelloss Li Jin
            icexelloss Li Jin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: