beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [01/50] [abbrv] beam git commit: Update DataflowPipelineResult.state at the end of poll_for_job_completion.
Date Mon, 30 Jan 2017 23:03:08 GMT
Repository: beam
Updated Branches:
  refs/heads/master 847e4e9f0 -> c3b97a287


Update DataflowPipelineResult.state at the end of poll_for_job_completion.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56512ab4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56512ab4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56512ab4

Branch: refs/heads/master
Commit: 56512ab442c599c64bfdb9fc6cabce95d76ee4dc
Parents: c03e6f3
Author: Ahmet Altay <altay@google.com>
Authored: Fri Jan 20 23:43:42 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Jan 20 23:43:42 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py      | 6 ++++--
 sdks/python/apache_beam/runners/dataflow_runner_test.py | 5 ++---
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/56512ab4/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index bd25dbf..31d3386 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -78,7 +78,7 @@ class DataflowRunner(PipelineRunner):
     return 's%s' % self._unique_step_id
 
   @staticmethod
-  def poll_for_job_completion(runner, job_id):
+  def poll_for_job_completion(runner, result):
     """Polls for the specified job to finish running (successfully or not)."""
     last_message_time = None
     last_message_id = None
@@ -101,6 +101,7 @@ class DataflowRunner(PipelineRunner):
       else:
         return 0
 
+    job_id = result.job_id()
     while True:
       response = runner.dataflow_client.get_job(job_id)
       # If get() is called very soon after Create() the response may not contain
@@ -151,6 +152,7 @@ class DataflowRunner(PipelineRunner):
         if not page_token:
           break
 
+    result._job = response
     runner.last_error_msg = last_error_msg
 
   def run(self, pipeline):
@@ -694,7 +696,7 @@ class DataflowPipelineResult(PipelineResult):
 
       thread = threading.Thread(
           target=DataflowRunner.poll_for_job_completion,
-          args=(self._runner, self.job_id()))
+          args=(self._runner, self))
 
       # Mark the thread as a daemon thread so a keyboard interrupt on the main
       # thread will terminate everything. This is also the reason we will not

http://git-wip-us.apache.org/repos/asf/beam/blob/56512ab4/sdks/python/apache_beam/runners/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py
index a935c98..4983899 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py
@@ -47,12 +47,11 @@ class DataflowRunnerTest(unittest.TestCase):
         self.dataflow_client.list_messages = mock.MagicMock(
             return_value=([], None))
 
-    with self.assertRaises(DataflowRuntimeException) as e:
+    with self.assertRaisesRegexp(
+        DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'):
       failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
       failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
       failed_result.wait_until_finish()
-    self.assertTrue(
-        'Dataflow pipeline failed. State: FAILED' in e.exception.message)
 
     succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
     succeeded_result = DataflowPipelineResult(


Mime
View raw message