beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/3] beam git commit: Make TestPipeline.run fail when the underlying execution fails.
Date Sat, 21 Jan 2017 00:46:46 GMT
Repository: beam
Updated Branches:
  refs/heads/python-sdk c57c66ed4 -> 82599a241


Make TestPipeline.run fail when the underlying execution fails.

Also, DataflowRunner will log the last error from its wait_until_finish
method.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa3a2cb3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa3a2cb3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa3a2cb3

Branch: refs/heads/python-sdk
Commit: aa3a2cb326a5f761eba9fe87fe7d57da9ce78555
Parents: c57c66e
Author: Ahmet Altay <altay@google.com>
Authored: Thu Jan 19 16:13:07 2017 -0800
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Fri Jan 20 16:46:20 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py    | 12 ++++--------
 sdks/python/apache_beam/test_pipeline.py              |  5 ++++-
 sdks/python/apache_beam/transforms/aggregator_test.py |  2 +-
 3 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 330472b..fd22753 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -704,6 +704,10 @@ class DataflowPipelineResult(PipelineResult):
       thread.start()
       while thread.isAlive():
         time.sleep(5.0)
+      if self.state != PipelineState.DONE:
+        logging.error(
+            'Dataflow pipeline failed. State: %s, Error:\n%s',
+            self.state, getattr(self._runner, 'last_error_msg', None))
     return self.state
 
   def __str__(self):
@@ -714,11 +718,3 @@ class DataflowPipelineResult(PipelineResult):
 
   def __repr__(self):
     return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
-
-
-class DataflowRuntimeException(Exception):
-  """Indicates an error has occurred in running this pipeline."""
-
-  def __init__(self, msg, result):
-    super(DataflowRuntimeException, self).__init__(msg)
-    self.result = result

http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index c29a879..7d85af9 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -22,6 +22,7 @@ import shlex
 
 from apache_beam.internal import pickler
 from apache_beam.pipeline import Pipeline
+from apache_beam.runners.runner import PipelineState
 from apache_beam.utils.pipeline_options import PipelineOptions
 from nose.plugins.skip import SkipTest
 
@@ -89,7 +90,9 @@ class TestPipeline(Pipeline):
   def run(self):
     result = super(TestPipeline, self).run()
     if self.blocking:
-      result.wait_until_finish()
+      state = result.wait_until_finish()
+      assert state == PipelineState.DONE, "Pipeline execution failed."
+
     return result
 
   def _parse_test_option_args(self, argv):

http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py
index d493c46..a2a4144 100644
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ b/sdks/python/apache_beam/transforms/aggregator_test.py
@@ -20,9 +20,9 @@
 import unittest
 
 import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import combiners
 from apache_beam.transforms.aggregator import Aggregator
-from apache_beam.test_pipeline import TestPipeline
 
 
 class AggregatorTest(unittest.TestCase):


Mime
View raw message