spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ofer Mendelevitch <omendelevi...@hortonworks.com>
Subject Re: Strange behavior with PySpark when using Join() and zip()
Date Mon, 23 Mar 2015 19:13:59 GMT
Thanks Sean,

Sorting definitely solves it, but I was hoping it could be avoided :)

In the documentation for Classification in ML-Lib for example, zip() is used to create labelsAndPredictions:

-----
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto”, impurity='gini', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())
----


The reason the zip() works here is because the RDD is loaded from a file.
If it was generated with something that includes a JOIN() it won’t work due to this same
issue.

Maybe worth mentioning in the docs then?

Ofer



> On Mar 23, 2015, at 11:40 AM, Sean Owen <sowen@cloudera.com> wrote:
> 
> I think the explanation is that the join does not guarantee any order,
> since it causes a shuffle in general, and it is computed twice in the
> first example, resulting in a difference for d1 and d2.
> 
> You can persist() the result of the join and in practice I believe
> you'd find it behaves as expected, although that is even not 100%
> guaranteed since a block could be lost and recomputed (differently).
> 
> If order matters, and it does for zip(), then the reliable way to
> guarantee a well defined ordering for zipping is to sort the RDDs.
> 
> On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch
> <omendelevitch@hortonworks.com> wrote:
>> Hi,
>> 
>> I am running into a strange issue when doing a JOIN of two RDDs followed by
>> ZIP from PySpark.
>> It’s part of a more complex application, but was able to narrow it down to a
>> simplified example that’s easy to replicate and causes the same problem to
>> appear:
>> 
>> 
>> raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
>> data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair:
>> ','.join([x for x in pair[1]]))
>> d1 = data.map(lambda s: s.split(',')[0])
>> d2 = data.map(lambda s: s.split(',')[1])
>> x = d1.zip(d2)
>> 
>> print x.take(10)
>> 
>> 
>> The output is:
>> 
>> 
>> [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81',
>> 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'),
>> ('v83', 'v83')]
>> 
>> 
>> As you can see, the ordering of items is not preserved anymore in all cases.
>> (e.g., ‘v81’ is preserved, and ‘v45’ is not)
>> Is it not supposed to be preserved?
>> 
>> If I do the same thing without the JOIN:
>> 
>> data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
>> d1 = data.map(lambda s: s.split(',')[0])
>> d2 = data.map(lambda s: s.split(',')[1])
>> x = d1.zip(d2)
>> 
>> print x.take(10)
>> 
>> The output is:
>> 
>> 
>> [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'),
>> ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]
>> 
>> 
>> As expected.
>> 
>> Anyone run into this or a similar issue?
>> 
>> Ofer

Mime
View raw message