spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkbrad...@apache.org
Subject spark git commit: [SPARK-9805] [MLLIB] [PYTHON] [STREAMING] Added _eventually for ml streaming pyspark tests
Date Sun, 16 Aug 2015 01:48:25 GMT
Repository: spark
Updated Branches:
  refs/heads/master 570567258 -> 1db7179fa


[SPARK-9805] [MLLIB] [PYTHON] [STREAMING] Added _eventually for ml streaming pyspark tests

Recently, PySpark ML streaming tests have been flaky, most likely because of the batches not
being processed in time.  Proposal: Replace the use of _ssc_wait (which waits for a fixed
amount of time) with a method which waits for a fixed amount of time but can terminate early
based on a termination condition method.  With this, we can extend the waiting period (to
make tests less flaky) but also stop early when possible (making tests faster on average,
which I verified locally).

CC: mengxr tdas freeman-lab

Author: Joseph K. Bradley <joseph@databricks.com>

Closes #8087 from jkbradley/streaming-ml-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1db7179f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1db7179f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1db7179f

Branch: refs/heads/master
Commit: 1db7179fae672fcec7b8de12c374dd384ce51c67
Parents: 5705672
Author: Joseph K. Bradley <joseph@databricks.com>
Authored: Sat Aug 15 18:48:20 2015 -0700
Committer: Joseph K. Bradley <joseph@databricks.com>
Committed: Sat Aug 15 18:48:20 2015 -0700

----------------------------------------------------------------------
 python/pyspark/mllib/tests.py | 177 +++++++++++++++++++++++++++----------
 1 file changed, 129 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1db7179f/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 3f5a02a..5097c5e 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -32,6 +32,9 @@ from numpy import sum as array_sum
 
 from py4j.protocol import Py4JJavaError
 
+if sys.version > '3':
+    basestring = str
+
 if sys.version_info[:2] <= (2, 6):
     try:
         import unittest2 as unittest
@@ -86,9 +89,42 @@ class MLLibStreamingTestCase(unittest.TestCase):
         self.ssc.stop(False)
 
     @staticmethod
-    def _ssc_wait(start_time, end_time, sleep_time):
-        while time() - start_time < end_time:
+    def _eventually(condition, timeout=30.0, catch_assertions=False):
+        """
+        Wait a given amount of time for a condition to pass, else fail with an error.
+        This is a helper utility for streaming ML tests.
+        :param condition: Function that checks for termination conditions.
+                          condition() can return:
+                           - True: Conditions met. Return without error.
+                           - other value: Conditions not met yet. Continue. Upon timeout,
+                                          include last such value in error message.
+                          Note that this method may be called at any time during
+                          streaming execution (e.g., even before any results
+                          have been created).
+        :param timeout: Number of seconds to wait.  Default 30 seconds.
+        :param catch_assertions: If False (default), do not catch AssertionErrors.
+                                 If True, catch AssertionErrors; continue, but save
+                                 error to throw upon timeout.
+        """
+        start_time = time()
+        lastValue = None
+        while time() - start_time < timeout:
+            if catch_assertions:
+                try:
+                    lastValue = condition()
+                except AssertionError as e:
+                    lastValue = e
+            else:
+                lastValue = condition()
+            if lastValue is True:
+                return
             sleep(0.01)
+        if isinstance(lastValue, AssertionError):
+            raise lastValue
+        else:
+            raise AssertionError(
+                "Test failed due to timeout after %g sec, with last condition returning:
%s"
+                % (timeout, lastValue))
 
 
 def _squared_distance(a, b):
@@ -999,10 +1035,13 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
             [self.sc.parallelize(batch, 1) for batch in batches])
         stkm.trainOn(input_stream)
 
-        t = time()
         self.ssc.start()
-        self._ssc_wait(t, 10.0, 0.01)
-        self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
+
+        def condition():
+            self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
+            return True
+        self._eventually(condition, catch_assertions=True)
+
         realCenters = array_sum(array(centers), axis=0)
         for i in range(5):
             modelCenters = stkm.latestModel().centers[0][i]
@@ -1027,7 +1066,7 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
         stkm.setInitialCenters(
             centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
 
-        # Create a toy dataset by setting a tiny offest for each point.
+        # Create a toy dataset by setting a tiny offset for each point.
         offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
         batches = []
         for offset in offsets:
@@ -1037,14 +1076,15 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
         batches = [self.sc.parallelize(batch, 1) for batch in batches]
         input_stream = self.ssc.queueStream(batches)
         stkm.trainOn(input_stream)
-        t = time()
         self.ssc.start()
 
         # Give enough time to train the model.
-        self._ssc_wait(t, 6.0, 0.01)
-        finalModel = stkm.latestModel()
-        self.assertTrue(all(finalModel.centers == array(initCenters)))
-        self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
+        def condition():
+            finalModel = stkm.latestModel()
+            self.assertTrue(all(finalModel.centers == array(initCenters)))
+            self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
+            return True
+        self._eventually(condition, catch_assertions=True)
 
     def test_predictOn_model(self):
         """Test that the model predicts correctly on toy data."""
@@ -1066,10 +1106,13 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
                 result.append(rdd_collect)
 
         predict_val.foreachRDD(update)
-        t = time()
         self.ssc.start()
-        self._ssc_wait(t, 6.0, 0.01)
-        self.assertEquals(result, [[0], [1], [2], [3]])
+
+        def condition():
+            self.assertEquals(result, [[0], [1], [2], [3]])
+            return True
+
+        self._eventually(condition, catch_assertions=True)
 
     def test_trainOn_predictOn(self):
         """Test that prediction happens on the updated model."""
@@ -1095,10 +1138,13 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
         predict_stream = stkm.predictOn(input_stream)
         predict_stream.foreachRDD(collect)
 
-        t = time()
         self.ssc.start()
-        self._ssc_wait(t, 6.0, 0.01)
-        self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
+
+        def condition():
+            self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
+            return True
+
+        self._eventually(condition, catch_assertions=True)
 
 
 class LinearDataGeneratorTests(MLlibTestCase):
@@ -1156,11 +1202,14 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
         slr.setInitialWeights([0.0])
         slr.trainOn(input_stream)
 
-        t = time()
         self.ssc.start()
-        self._ssc_wait(t, 20.0, 0.01)
-        rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
-        self.assertAlmostEqual(rel, 0.1, 1)
+
+        def condition():
+            rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
+            self.assertAlmostEqual(rel, 0.1, 1)
+            return True
+
+        self._eventually(condition, catch_assertions=True)
 
     def test_convergence(self):
         """
@@ -1179,13 +1228,18 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
         input_stream.foreachRDD(
             lambda x: models.append(slr.latestModel().weights[0]))
 
-        t = time()
         self.ssc.start()
-        self._ssc_wait(t, 15.0, 0.01)
+
+        def condition():
+            self.assertEquals(len(models), len(input_batches))
+            return True
+
+        # We want all batches to finish for this test.
+        self._eventually(condition, 60.0, catch_assertions=True)
+
         t_models = array(models)
         diff = t_models[1:] - t_models[:-1]
-
-        # Test that weights improve with a small tolerance,
+        # Test that weights improve with a small tolerance
         self.assertTrue(all(diff >= -0.1))
         self.assertTrue(array_sum(diff > 0) > 1)
 
@@ -1208,9 +1262,13 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
         predict_stream = slr.predictOnValues(input_stream)
         true_predicted = []
         predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
-        t = time()
         self.ssc.start()
-        self._ssc_wait(t, 5.0, 0.01)
+
+        def condition():
+            self.assertEquals(len(true_predicted), len(input_batches))
+            return True
+
+        self._eventually(condition, catch_assertions=True)
 
         # Test that the accuracy error is no more than 0.4 on each batch.
         for batch in true_predicted:
@@ -1242,12 +1300,17 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
         ps = slr.predictOnValues(predict_stream)
         ps.foreachRDD(lambda x: collect_errors(x))
 
-        t = time()
         self.ssc.start()
-        self._ssc_wait(t, 20.0, 0.01)
 
-        # Test that the improvement in error is atleast 0.3
-        self.assertTrue(errors[1] - errors[-1] > 0.3)
+        def condition():
+            # Test that the improvement in error is > 0.3
+            if len(errors) == len(predict_batches):
+                self.assertGreater(errors[1] - errors[-1], 0.3)
+            if len(errors) >= 3 and errors[1] - errors[-1] > 0.3:
+                return True
+            return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
+
+        self._eventually(condition)
 
 
 class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
@@ -1274,13 +1337,16 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
             batches.append(sc.parallelize(batch))
 
         input_stream = self.ssc.queueStream(batches)
-        t = time()
         slr.trainOn(input_stream)
         self.ssc.start()
-        self._ssc_wait(t, 10, 0.01)
-        self.assertArrayAlmostEqual(
-            slr.latestModel().weights.array, [10., 10.], 1)
-        self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
+
+        def condition():
+            self.assertArrayAlmostEqual(
+                slr.latestModel().weights.array, [10., 10.], 1)
+            self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
+            return True
+
+        self._eventually(condition, catch_assertions=True)
 
     def test_parameter_convergence(self):
         """Test that the model parameters improve with streaming data."""
@@ -1298,13 +1364,18 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
         input_stream = self.ssc.queueStream(batches)
         input_stream.foreachRDD(
             lambda x: model_weights.append(slr.latestModel().weights[0]))
-        t = time()
         slr.trainOn(input_stream)
         self.ssc.start()
-        self._ssc_wait(t, 10, 0.01)
 
-        model_weights = array(model_weights)
-        diff = model_weights[1:] - model_weights[:-1]
+        def condition():
+            self.assertEquals(len(model_weights), len(batches))
+            return True
+
+        # We want all batches to finish for this test.
+        self._eventually(condition, catch_assertions=True)
+
+        w = array(model_weights)
+        diff = w[1:] - w[:-1]
         self.assertTrue(all(diff >= -0.1))
 
     def test_prediction(self):
@@ -1323,13 +1394,18 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
                 sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
 
         input_stream = self.ssc.queueStream(batches)
-        t = time()
         output_stream = slr.predictOnValues(input_stream)
         samples = []
         output_stream.foreachRDD(lambda x: samples.append(x.collect()))
 
         self.ssc.start()
-        self._ssc_wait(t, 5, 0.01)
+
+        def condition():
+            self.assertEquals(len(samples), len(batches))
+            return True
+
+        # We want all batches to finish for this test.
+        self._eventually(condition, catch_assertions=True)
 
         # Test that mean absolute error on each batch is less than 0.1
         for batch in samples:
@@ -1350,22 +1426,27 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
 
         predict_batches = [
             b.map(lambda lp: (lp.label, lp.features)) for b in batches]
-        mean_absolute_errors = []
+        errors = []
 
         def func(rdd):
             true, predicted = zip(*rdd.collect())
-            mean_absolute_errors.append(mean(abs(true) - abs(predicted)))
+            errors.append(mean(abs(true) - abs(predicted)))
 
-        model_weights = []
         input_stream = self.ssc.queueStream(batches)
         output_stream = self.ssc.queueStream(predict_batches)
-        t = time()
         slr.trainOn(input_stream)
         output_stream = slr.predictOnValues(output_stream)
         output_stream.foreachRDD(func)
         self.ssc.start()
-        self._ssc_wait(t, 10, 0.01)
-        self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
+
+        def condition():
+            if len(errors) == len(predict_batches):
+                self.assertGreater(errors[1] - errors[-1], 2)
+            if len(errors) >= 3 and errors[1] - errors[-1] > 2:
+                return True
+            return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
+
+        self._eventually(condition)
 
 
 class MLUtilsTests(MLlibTestCase):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message