Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28502

Error with struct conversion while using pandas_udf

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.4.3
    • 3.0.0
    • PySpark
    • None
    • OS: Ubuntu

      Python: 3.6

    Description

      What I am trying to do: Group data based on time intervals (e.g., 15 days window) and perform some operations on dataframe using (pandas) UDFs. I don't know if there is a better/cleaner way to do it.

      Below is the sample code that I tried and error message I am getting.

       

      df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
                                  (13.00, "2018-03-11T12:27:18+00:00"),
                                  (25.00, "2018-03-12T11:27:18+00:00"),
                                  (20.00, "2018-03-13T15:27:18+00:00"),
                                  (17.00, "2018-03-14T12:27:18+00:00"),
                                  (99.00, "2018-03-15T11:27:18+00:00"),
                                  (156.00, "2018-03-22T11:27:18+00:00"),
                                  (17.00, "2018-03-31T11:27:18+00:00"),
                                  (25.00, "2018-03-15T11:27:18+00:00"),
                                  (25.00, "2018-03-16T11:27:18+00:00")
                                  ],
                                 ["id", "ts"])
      df = df.withColumn('ts', df.ts.cast('timestamp'))
      
      schema = StructType([
          StructField("id", IntegerType()),
          StructField("ts", TimestampType())
      ])
      
      
      @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
      def some_udf(df):
          # some computation
          return df
      
      df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
      

      This throws following exception:

      TypeError: Unsupported type in conversion from Arrow: struct<start: timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
      

       

      However, if I use builtin agg method then it works all fine. For example,

      df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
      

      Output

      +-----+------------------------------------------+-------+
      |id   |window                                    |avg(id)|
      +-----+------------------------------------------+-------+
      |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
      |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
      |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
      |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
      |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
      |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
      |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
      +-----+------------------------------------------+-------+
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              nasirali Nasir Ali
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: