Details
Description
Training a RandomForest classifier on a dataset obtained as a union of two RDDs throws a broken pipe error:
Traceback (most recent call last):
File "/home/laskov/code/spark-1.2.1/python/pyspark/daemon.py", line 162, in manager
code = worker(sock)
File "/home/laskov/code/spark-1.2.1/python/pyspark/daemon.py", line 64, in worker
outfile.flush()
IOError: [Errno 32] Broken pipe
Despite an error the job runs to completion.
The following code reproduces the error:
from pyspark.context import SparkContext
from pyspark.mllib.rand import RandomRDDs
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint
import random
if _name_ == "_main_":
sc = SparkContext(appName="Union bug test")
data1 = RandomRDDs.normalVectorRDD(sc,numRows=10000,numCols=200)
data1 = data1.map(lambda x: LabeledPoint(random.randint(0,1),\
DenseVector))
data2 = RandomRDDs.normalVectorRDD(sc,numRows=10000,numCols=200)
data2 = data2.map(lambda x: LabeledPoint(random.randint(0,1),\
DenseVector))
training_data = data1.union(data2)
#training_data = training_data.repartition(2)
model = RandomForest.trainClassifier(training_data, numClasses=2,
categoricalFeaturesInfo={},
numTrees=50, maxDepth=30)
Interestingly, re-partitioning the data after the union operation rectifies the problem (uncomment the line before training in the code above).