beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Improve bundle retry display.
Date Sat, 02 Sep 2017 00:29:00 GMT
Repository: beam
Updated Branches:
  refs/heads/master 3aa2bef87 -> 2389d56a4


Improve bundle retry display.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/453896a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/453896a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/453896a4

Branch: refs/heads/master
Commit: 453896a4646c3fc9f73154b74abbe9f62dd2c33c
Parents: 3aa2bef
Author: Maria Garcia Herrero <mariagh@google.com>
Authored: Wed Aug 30 17:00:13 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Sep 1 17:28:40 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/direct/executor.py | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/453896a4/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 3e08b52..890aa88 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -262,6 +262,8 @@ class TransformExecutor(_ExecutorService.CallableTask):
   completion callback.
   """
 
+  _MAX_RETRY_PER_BUNDLE = 4
+
   def __init__(self, transform_evaluator_registry, evaluation_context,
                input_bundle, fired_timers, applied_ptransform,
                completion_callback, transform_evaluation_state):
@@ -278,12 +280,11 @@ class TransformExecutor(_ExecutorService.CallableTask):
     self._retry_count = 0
     # Switch to turn on/off the retry of bundles.
     pipeline_options = self._evaluation_context.pipeline_options
+    # TODO(mariagh): Remove once "bundle retry" is no longer experimental.
     if not pipeline_options.view_as(DirectOptions).direct_runner_bundle_retry:
       self._max_retries_per_bundle = 1
     else:
-      self._max_retries_per_bundle = 4
-    # TODO(mariagh): make _max_retries_per_bundle a constant
-    # once "bundle retry" is no longer experimental.
+      self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE
 
   def call(self):
     self._call_count += 1
@@ -312,12 +313,17 @@ class TransformExecutor(_ExecutorService.CallableTask):
         break
       except Exception as e:
         self._retry_count += 1
-        logging.info(
-            'Exception at bundle %r, due to an exception: %s',
+        logging.error(
+            'Exception at bundle %r, due to an exception.\n %s',
             self._input_bundle, traceback.format_exc())
         if self._retry_count == self._max_retries_per_bundle:
           logging.error('Giving up after %s attempts.',
                         self._max_retries_per_bundle)
+          if self._retry_count == 1:
+            logging.info(
+                'Use the experimental flag --direct_runner_bundle_retry'
+                ' to retry failed bundles (up to %d times).',
+                TransformExecutor._MAX_RETRY_PER_BUNDLE)
           self._completion_callback.handle_exception(self, e)
 
     self._evaluation_context.metrics().commit_physical(


Mime
View raw message