Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.3.3
-
None
-
None
-
Spark version: 2.3.3
OS: CentOS Linux release 7.3.1611
Kernel: 3.10.0-862.14.4.el7.x86_64
Java: openjdk version "1.8.0_151"
Description
I use Spark ML to build my application and write some test cases based on the examples of https://spark.apache.org/docs/latest/ml-features.html to test whether my application is compatible with Spark's transformers.
After upgrading my Spark version from 2.1.1 to 2.3.3, something strange happened. I train a Pipeline model which contains a Word2Vec transformer and save the model into local, most of the time it works pretty well but sometimes I get the UnsupportedOperationException error:
Code:
val data = spark.createDataFrame(Seq( (1, "Hi I heard about Spark".split(" ")), (2, "I wish Java could use case classes".split(" ")), (3, "Logistic regression models are neat".split(" ")) )).toDF("label", "text") // transformers val word2Vec = new Word2Vec() .setInputCol("text") .setOutputCol("result") .setVectorSize(3) .setMinCount(0) val pipeline = new Pipeline().setStages(Array(word2Vec)) val model = pipeline.fit(data) model.write.overwrite.save("./model_data") // Then my applicatin will read the model data file...
Exception:
java.lang.UnsupportedOperationException: Schema for type scala.Array[Float] is not supported at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:780) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:715) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.getPath$1(ScalaReflection.scala:173) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:298) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:150) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:150) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:386) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:380) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:380) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:150) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:150) at org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:138) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34) at org.apache.spark.ml.feature.Word2VecModel$Word2VecModelWriter.saveImpl(Word2Vec.scala:347) at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:103) at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$saveImpl$1.apply(Pipeline.scala:254) at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$saveImpl$1.apply(Pipeline.scala:253) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.ml.Pipeline$SharedReadWrite$.saveImpl(Pipeline.scala:253) at org.apache.spark.ml.PipelineModel$PipelineModelWriter.saveImpl(Pipeline.scala:338) at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:103)
Any idea how to figure out the root cause of this error? Thanks!