Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47718

.sql() does not recognize watermark defined upstream

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 3.5.1
    • None
    • PySpark
    • None

    Description

      I have a data pipeline set up in such a way that it reads data from a Kafka source, does some transformation on the data using pyspark, then writes the output into a sink (Kafka, Redis, etc).

       

      My entire pipeline in written in SQL, so I wish to use the .sql() method to execute SQL on my streaming source directly.

       

      However, I'm running into the issue where my watermark is not being recognized by the downstream query via the .sql() method.

       

      ```
      Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) [Clang 16.0.6 ] on darwin
      Type "help", "copyright", "credits" or "license" for more information.
      >>> import pyspark
      >>> print(pyspark._version_)
      3.5.1
      >>> from pyspark.sql import SparkSession
      >>>
      >>> session = SparkSession.builder \
      ...     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
      ...     .getOrCreate()
      >>> from pyspark.sql.functions import col, from_json
      >>> from pyspark.sql.types import StructField, StructType, TimestampType, LongType, DoubleType, IntegerType
      >>> schema = StructType(
      ...     [
      ...         StructField('createTime', TimestampType(), True),
      ...         StructField('orderId', LongType(), True),
      ...         StructField('payAmount', DoubleType(), True),
      ...         StructField('payPlatform', IntegerType(), True),
      ...         StructField('provinceId', IntegerType(), True),
      ...     ])
      >>>
      >>> streaming_df = session.readStream\
      ...     .format("kafka")\
      ...     .option("kafka.bootstrap.servers", "localhost:9092")\
      ...     .option("subscribe", "payment_msg")\
      ...     .option("startingOffsets","earliest")\
      ...     .load()\
      ...     .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
      ...     .select("parsed_value.*")\
      ...     .withWatermark("createTime", "10 seconds")
      >>>
      >>> streaming_df.createOrReplaceTempView("streaming_df")
      >>> session.sql("""
      ... SELECT
      ...     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
      ...     FROM streaming_df
      ...     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
      ...     ORDER BY window.start
      ... """)\
      ...   .writeStream\
      ...   .format("kafka") \
      ...   .option("checkpointLocation", "checkpoint") \
      ...   .option("kafka.bootstrap.servers", "localhost:9092") \
      ...   .option("topic", "sink") \
      ...   .start()
      ```
       
      This throws exception
      ```
      pyspark.errors.exceptions.captured.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; line 6 pos 4;
      ```
       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              chloehe Chloe He
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: