Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34510

.foreachPartition command hangs when ran inside Python package but works when ran from Python file outside the package on EMR

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.0.0
    • None
    • EC2, PySpark
    • None

    Description

      I'm running on EMR Pyspark 3.0.0. with project structure below, process.py is what controls the flow of the application and calls code inside the file_processor package. The command hangs when the .foreachPartition code that is located inside s3_repo.py is called by process.py. When the same .foreachPartition code is moved from s3_repo.py and placed inside the process.py it runs just fine.

      process.py
      file_processor
        config        
          spark.py
        repository        
          s3_repo.py
        structure        
          table_creator.py
      
      

      process.py

      from file_processor.structure import table_creator
      from file_processor.repository import s3_repo
      
      def process():
          table_creator.create_table()
          s3_repo.save_to_s3()
      
      if __name__ == '__main__':
          process()
      

      spark.py

      from pyspark.sql import SparkSession
      spark_session = SparkSession.builder.appName("Test").getOrCreate()
      

      s3_repo.py 

      from file_processor.config.spark import spark_session
      
      def save_to_s3():
          spark_session.sql('SELECT * FROM rawFileData').toJSON().foreachPartition(_save_to_s3)
      
      def _save_to_s3(iterator):   
          for record in iterator:
              print(record)
      

       table_creator.py

      from file_processor.config.spark import spark_session
      from pyspark.sql import Row
      
      def create_table():
          file_contents = [
              {'line_num': 1, 'contents': 'line 1'},
              {'line_num': 2, 'contents': 'line 2'},
              {'line_num': 3, 'contents': 'line 3'}        
          ]
          spark_session.createDataFrame(Row(**row) for row in file_contents).cache().createOrReplaceTempView("rawFileData")
      

      Attachments

        1. Code.zip
          3 kB
          Yuriy

        Activity

          People

            Unassigned Unassigned
            Lapidus Yuriy
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: