Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30966

spark.createDataFrame fails with pandas DataFrame including pandas.NA

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.4.5
    • None
    • PySpark

    Description

      As of pandas 1.0.0, pandas.NA was introduced, and that breaks createDataFrame function as the following:

      In [5]: from pyspark.sql import SparkSession
      
      In [6]: spark = SparkSession.builder.getOrCreate()
      
      In [7]: spark.conf.set("spark.sql.execution.arrow.enabled", "true")
      
      In [8]: import numpy as np
         ...: import pandas as pd
      
      In [12]: pdf = pd.DataFrame(data=[{'a':1,'b':2}, {'a':3,'b':4,'c':5}], dtype=pd.Int64Dtype())
      
      In [16]: pdf
      Out[16]:
         a  b     c
      0  1  2  <NA>
      1  3  4     5
      
      In [13]: sdf = spark.createDataFrame(pdf)
      /Users/ariga/src/pytd/.venv/lib/python3.6/site-packages/pyspark/sql/session.py:714: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
        Did not pass numpy.dtype object
      Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
        warnings.warn(msg)
      ---------------------------------------------------------------------------
      TypeError                                 Traceback (most recent call last)
      <ipython-input-13-ad13ba53b87e> in <module>
      ----> 1 sdf = spark.createDataFrame(df2)
      
      ~/src/pytd/.venv/lib/python3.6/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
          746             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
          747         else:
      --> 748             rdd, schema = self._createFromLocal(map(prepare, data), schema)
          749         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
          750         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
      
      ~/src/pytd/.venv/lib/python3.6/site-packages/pyspark/sql/session.py in _createFromLocal(self, data, schema)
          414
          415         if schema is None or isinstance(schema, (list, tuple)):
      --> 416             struct = self._inferSchemaFromList(data, names=schema)
          417             converter = _create_converter(struct)
          418             data = map(converter, data)
      
      ~/src/pytd/.venv/lib/python3.6/site-packages/pyspark/sql/session.py in _inferSchemaFromList(self, data, names)
          346             warnings.warn("inferring schema from dict is deprecated,"
          347                           "please use pyspark.sql.Row instead")
      --> 348         schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
          349         if _has_nulltype(schema):
          350             raise ValueError("Some of types cannot be determined after inferring")
      
      ~/src/pytd/.venv/lib/python3.6/site-packages/pyspark/sql/types.py in _merge_type(a, b, name)
         1099         fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()),
         1100                                                   name=new_name(f.name)))
      -> 1101                   for f in a.fields]
         1102         names = set([f.name for f in fields])
         1103         for n in nfs:
      
      ~/src/pytd/.venv/lib/python3.6/site-packages/pyspark/sql/types.py in <listcomp>(.0)
         1099         fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()),
         1100                                                   name=new_name(f.name)))
      -> 1101                   for f in a.fields]
         1102         names = set([f.name for f in fields])
         1103         for n in nfs:
      
      ~/src/pytd/.venv/lib/python3.6/site-packages/pyspark/sql/types.py in _merge_type(a, b, name)
         1092     elif type(a) is not type(b):
         1093         # TODO: type cast (such as int -> long)
      -> 1094         raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
         1095
         1096     # same type
      
      TypeError: field c: Can not merge type <class 'pyspark.sql.types.StructType'> and <class 'pyspark.sql.types.LongType'>
      
      In [15]: pyspark.__version__
      Out[15]: '2.4.5'
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              chezou Aki Ariga
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: