spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bryan Cutler (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-10086) Flaky StreamingKMeans test in PySpark
Date Sat, 13 Feb 2016 00:42:18 GMT

    [ https://issues.apache.org/jira/browse/SPARK-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15145625#comment-15145625
] 

Bryan Cutler commented on SPARK-10086:
--------------------------------------

I was able to track down the cause of these failures, so here is an update with what I found.
 The test {{StreamingKMeansTest.test_trainOn_predictOn}} has 2 {{DStream.foreachRDD}} output
operations, 1 in the call to {{StreamingKMeans.trainOn}} and 1 with {{collect}} which has
a parent {{DStream}} that is a {{PythonTransformedDStream}} returned from {{StreamingKMeans.predictOn}},
so 2 jobs are generated for each batch.  When the {{DStream}} jobs are generated, there is
nothing to compute for the first job, which updates the model.  For generating the second
job, {{PythonTransformedDStream.compute}} gets called which will then do a {{PythonTransformFunction}}
callback that creates a {{PythonRDD}} and serializes the mapped predict function to a command,
containing the current model.  

Next, the 2 jobs are scheduled in order - first to update the model and then collect the predicted
result.  At this point, there is a race condition between completing the model update and
generating the next set of jobs, which is running in a different thread.  If there is enough
of a delay in the update, then the next set of jobs will be generated and the old model will
be serialized to the {{PythonRDD}} command again.  Finally, the predict will be run against
this old model causing the test failure.  

To sum it up, the underlying issue is that a func can be serialized with a value before a
job is run that updates this value.  This doesn't appear to be an issue in the Scala code
as the closure cleaner is run just before the job is executed, and it will get the updated
values.

So far, the best solution I can think of would be to somehow delay the serialization of the
model until it is needed, but I believe this would involve some big changes in {{PythonRDD}}
as would any other solutions I could think of.  Is something that would be worth doing to
correct this, or might there be an easier fix that I am not seeing?  It's not just a {{StreamingKMeans}}
issue, so it would affect any PySpark streaming application with similar structure.  

I am attaching some simplified code used to reproduce the issue.  I also have a similar Scala
version that produces the expected results.

> Flaky StreamingKMeans test in PySpark
> -------------------------------------
>
>                 Key: SPARK-10086
>                 URL: https://issues.apache.org/jira/browse/SPARK-10086
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib, PySpark, Streaming, Tests
>    Affects Versions: 1.5.0
>            Reporter: Joseph K. Bradley
>            Priority: Critical
>
> Here's a report on investigating test failures in StreamingKMeans in PySpark. (See Jenkins
links below.)
> It is a StreamingKMeans test which trains on a DStream with 2 batches and then tests
on those same 2 batches.  It fails here: [https://github.com/apache/spark/blob/1968276af0f681fe51328b7dd795bd21724a5441/python/pyspark/mllib/tests.py#L1144]
> I recreated the same test, with variants training on: (1) the original 2 batches, (2)
just the first batch, (3) just the second batch, and (4) neither batch.  Here is code which
avoids Streaming altogether to identify what batches were processed.
> {code}
> from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
> batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
> batches = [sc.parallelize(batch) for batch in batches]
> stkm = StreamingKMeans(decayFactor=0.0, k=2)
> stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
> # Train
> def update(rdd):
>     stkm._model.update(rdd, stkm._decayFactor, stkm._timeUnit)
> # Remove one or both of these lines to test skipping batches.
> update(batches[0])
> update(batches[1])
> # Test
> def predict(rdd):
>     return stkm._model.predict(rdd)
> predict(batches[0]).collect()
> predict(batches[1]).collect()
> {code}
> *Results*:
> {code}
> ####################### EXPECTED
> [0, 1, 1]                                                                       
> [1, 0, 1]
> ####################### Skip batch 0
> [1, 0, 0]
> [0, 1, 0]
> ####################### Skip batch 1
> [0, 1, 1]
> [1, 0, 1]
> ####################### Skip both batches  (This is what we see in the test failures.)
> [0, 1, 1]
> [0, 0, 0]
> {code}
> Skipping both batches reproduces the failure.  There is no randomness in the StreamingKMeans
algorithm (since initial centers are fixed, not randomized).
> CC: [~tdas] [~freeman-lab] [~mengxr]
> Failure message:
> {code}
> ======================================================================
> FAIL: test_trainOn_predictOn (__main__.StreamingKMeansTest)
> Test that prediction happens on the updated model.
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py",
line 1147, in test_trainOn_predictOn
>     self._eventually(condition, catch_assertions=True)
>   File "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py",
line 123, in _eventually
>     raise lastValue
>   File "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py",
line 114, in _eventually
>     lastValue = condition()
>   File "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py",
line 1144, in condition
>     self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
> AssertionError: Lists differ: [[0, 1, 1], [0, 0, 0]] != [[0, 1, 1], [1, 0, 1]]
> First differing element 1:
> [0, 0, 0]
> [1, 0, 1]
> - [[0, 1, 1], [0, 0, 0]]
> ?                 ^^^^
> + [[0, 1, 1], [1, 0, 1]]
> ?              +++   ^
> ----------------------------------------------------------------------
> Ran 62 tests in 164.188s
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message