beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [29/50] [abbrv] beam git commit: Refactoring metrics infrastructure
Date Mon, 30 Jan 2017 23:03:36 GMT
Refactoring metrics infrastructure


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b148f5cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b148f5cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b148f5cc

Branch: refs/heads/master
Commit: b148f5cc9f3e414b9cd1f605b25d50e21f626b7a
Parents: e3849af
Author: Pablo <pabloem@google.com>
Authored: Mon Jan 23 17:50:21 2017 -0800
Committer: Robert Bradshaw <robertwb@google.com>
Committed: Thu Jan 26 15:28:49 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/metrics/execution.pxd   | 31 +++++++++
 sdks/python/apache_beam/metrics/execution.py    | 70 ++++++++++++--------
 sdks/python/apache_beam/runners/common.pxd      |  2 +
 sdks/python/apache_beam/runners/common.py       | 11 ++-
 .../apache_beam/runners/direct/executor.py      | 12 ++--
 .../runners/direct/transform_evaluator.py       | 54 ++++++++-------
 sdks/python/setup.py                            |  1 +
 7 files changed, 125 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.pxd b/sdks/python/apache_beam/metrics/execution.pxd
new file mode 100644
index 0000000..d89004f
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/execution.pxd
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cimport cython
+
+
+cdef class MetricsContainer(object):
+  cdef object step_name
+  cdef public object counters
+  cdef public object distributions
+
+
+cdef class ScopedMetricsContainer(object):
+  cpdef enter(self)
+  cpdef exit(self)
+  cdef list _stack
+  cdef MetricsContainer _container

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index 8f04b7b..3ba1735 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -98,36 +98,49 @@ class MetricResult(object):
         self.key, self.committed, self.attempted)
 
 
-class MetricsEnvironment(object):
+class _MetricsEnvironment(object):
   """Holds the MetricsContainer for every thread and other metric information.
 
   This class is not meant to be instantiated, instead being used to keep
   track of global state.
   """
-  METRICS_SUPPORTED = False
-  _METRICS_SUPPORTED_LOCK = threading.Lock()
-
-  PER_THREAD = threading.local()
+  def __init__(self):
+    self.METRICS_SUPPORTED = False
+    self._METRICS_SUPPORTED_LOCK = threading.Lock()
+    self.PER_THREAD = threading.local()
+    self.set_container_stack()
+
+  def set_container_stack(self):
+    if not hasattr(self.PER_THREAD, 'container'):
+      self.PER_THREAD.container = []
+
+  def container_stack(self):
+    self.set_container_stack()
+    return self.PER_THREAD.container
+
+  def set_metrics_supported(self, supported):
+    self.set_container_stack()
+    with self._METRICS_SUPPORTED_LOCK:
+      self.METRICS_SUPPORTED = supported
+
+  def current_container(self):
+    self.set_container_stack()
+    index = len(self.PER_THREAD.container) - 1
+    if index < 0:
+      return None
+    else:
+      return self.PER_THREAD.container[index]
 
-  @classmethod
-  def set_metrics_supported(cls, supported):
-    with cls._METRICS_SUPPORTED_LOCK:
-      cls.METRICS_SUPPORTED = supported
+  def set_current_container(self, container):
+    self.set_container_stack()
+    self.PER_THREAD.container.append(container)
 
-  @classmethod
-  def current_container(cls):
-    try:
-      return cls.PER_THREAD.container
-    except AttributeError:
-      return None
+  def unset_current_container(self):
+    self.set_container_stack()
+    self.PER_THREAD.container.pop()
 
-  @classmethod
-  def set_current_container(cls, container):
-    cls.PER_THREAD.container = container
 
-  @classmethod
-  def unset_current_container(cls):
-    del cls.PER_THREAD.container
+MetricsEnvironment = _MetricsEnvironment()
 
 
 class MetricsContainer(object):
@@ -180,16 +193,21 @@ class MetricsContainer(object):
 
 
 class ScopedMetricsContainer(object):
-  def __init__(self, container):
-    self._old_container = MetricsEnvironment.current_container()
+  def __init__(self, container=None):
+    self._stack = MetricsEnvironment.container_stack()
     self._container = container
 
+  def enter(self):
+    self._stack.append(self._container)
+
+  def exit(self):
+    self._stack.pop()
+
   def __enter__(self):
-    MetricsEnvironment.set_current_container(self._container)
-    return self._container
+    self.enter()
 
   def __exit__(self, type, value, traceback):
-    MetricsEnvironment.set_current_container(self._old_container)
+    self.exit()
 
 
 class MetricUpdates(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 10d1f96..f41b313 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -18,6 +18,7 @@
 cimport cython
 
 from apache_beam.utils.windowed_value cimport WindowedValue
+from apache_beam.metrics.execution cimport ScopedMetricsContainer
 
 
 cdef type SideOutputValue, TimestampedValue
@@ -40,6 +41,7 @@ cdef class DoFnRunner(Receiver):
   cdef object args
   cdef dict kwargs
   cdef object side_inputs
+  cdef ScopedMetricsContainer scoped_metrics_container
   cdef bint has_windowed_side_inputs
 
   cdef Receiver main_receivers

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 3741582..cb47513 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -22,6 +22,7 @@
 import sys
 
 from apache_beam.internal import util
+from apache_beam.metrics.execution import ScopedMetricsContainer
 from apache_beam.pvalue import SideOutputValue
 from apache_beam.transforms import core
 from apache_beam.transforms import window
@@ -69,7 +70,8 @@ class DoFnRunner(Receiver):
                logging_context=None,
                # Preferred alternative to context
                # TODO(robertwb): Remove once all runners are updated.
-               state=None):
+               state=None,
+               scoped_metrics_container=None):
     """Initializes a DoFnRunner.
 
     Args:
@@ -84,10 +86,13 @@ class DoFnRunner(Receiver):
       step_name: the name of this step
       logging_context: a LoggingContext object
       state: handle for accessing DoFn state
+      scoped_metrics_container: Context switcher for metrics container
     """
     self.step_name = step_name
     self.window_fn = windowing.windowfn
     self.tagged_receivers = tagged_receivers
+    self.scoped_metrics_container = (scoped_metrics_container
+                                     or ScopedMetricsContainer())
 
     global_window = window.GlobalWindow()
 
@@ -236,6 +241,7 @@ class DoFnRunner(Receiver):
   def _invoke_bundle_method(self, method):
     try:
       self.logging_context.enter()
+      self.scoped_metrics_container.enter()
       self.context.set_element(None)
       f = getattr(self.dofn, method)
 
@@ -251,6 +257,7 @@ class DoFnRunner(Receiver):
     except BaseException as exn:
       self.reraise_augmented(exn)
     finally:
+      self.scoped_metrics_container.exit()
       self.logging_context.exit()
 
   def start(self):
@@ -262,6 +269,7 @@ class DoFnRunner(Receiver):
   def process(self, element):
     try:
       self.logging_context.enter()
+      self.scoped_metrics_container.enter()
       if self.is_new_dofn:
         self.new_dofn_process(element)
       else:
@@ -269,6 +277,7 @@ class DoFnRunner(Receiver):
     except BaseException as exn:
       self.reraise_augmented(exn)
     finally:
+      self.scoped_metrics_container.exit()
       self.logging_context.exit()
 
   def reraise_augmented(self, exn):

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 7e404f8..2d4a8bd 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -27,7 +27,7 @@ import traceback
 from weakref import WeakValueDictionary
 
 from apache_beam.metrics.execution import MetricsContainer
-from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.execution import ScopedMetricsContainer
 
 
 class ExecutorService(object):
@@ -270,7 +270,7 @@ class TransformExecutor(ExecutorService.CallableTask):
     self._call_count += 1
     assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
     metrics_container = MetricsContainer(self._applied_transform.full_label)
-    MetricsEnvironment.set_current_container(metrics_container)
+    scoped_metrics_container = ScopedMetricsContainer(metrics_container)
 
     for side_input in self._applied_transform.side_inputs:
       if side_input not in self._side_input_values:
@@ -288,14 +288,16 @@ class TransformExecutor(ExecutorService.CallableTask):
 
     try:
       evaluator = self._transform_evaluator_registry.for_application(
-          self._applied_transform, self._input_bundle, side_input_values)
+          self._applied_transform, self._input_bundle,
+          side_input_values, scoped_metrics_container)
 
       if self._input_bundle:
         for value in self._input_bundle.elements:
           evaluator.process_element(value)
 
-      result = evaluator.finish_bundle()
-      result.metric_updates = metrics_container.get_cumulative()
+      with scoped_metrics_container:
+        result = evaluator.finish_bundle()
+        result.metric_updates = metrics_container.get_cumulative()
 
       if self._evaluation_context.has_cache:
         for uncommitted_bundle in result.output_bundles:

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index e8d8c4c..13c87c5 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -61,7 +61,8 @@ class TransformEvaluatorRegistry(object):
     }
 
   def for_application(
-      self, applied_ptransform, input_committed_bundle, side_inputs):
+      self, applied_ptransform, input_committed_bundle,
+      side_inputs, scoped_metrics_container):
     """Returns a TransformEvaluator suitable for processing given inputs."""
     assert applied_ptransform
     assert bool(applied_ptransform.side_inputs) == bool(side_inputs)
@@ -78,7 +79,8 @@ class TransformEvaluatorRegistry(object):
           'Execution of [%s] not implemented in runner %s.' % (
               type(applied_ptransform.transform), self))
     return evaluator(self._evaluation_context, applied_ptransform,
-                     input_committed_bundle, side_inputs)
+                     input_committed_bundle, side_inputs,
+                     scoped_metrics_container)
 
   def should_execute_serially(self, applied_ptransform):
     """Returns True if this applied_ptransform should run one bundle at a time.
@@ -108,7 +110,7 @@ class _TransformEvaluator(object):
   """An evaluator of a specific application of a transform."""
 
   def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs):
+               input_committed_bundle, side_inputs, scoped_metrics_container):
     self._evaluation_context = evaluation_context
     self._applied_ptransform = applied_ptransform
     self._input_committed_bundle = input_committed_bundle
@@ -116,7 +118,9 @@ class _TransformEvaluator(object):
     self._expand_outputs()
     self._execution_context = evaluation_context.get_execution_context(
         applied_ptransform)
-    self.start_bundle()
+    self.scoped_metrics_container = scoped_metrics_container
+    with scoped_metrics_container:
+      self.start_bundle()
 
   def _expand_outputs(self):
     outputs = set()
@@ -176,14 +180,14 @@ class _BoundedReadEvaluator(_TransformEvaluator):
   MAX_ELEMENT_PER_BUNDLE = 100
 
   def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs):
+               input_committed_bundle, side_inputs, scoped_metrics_container):
     assert not input_committed_bundle
     assert not side_inputs
     self._source = applied_ptransform.transform.source
     self._source.pipeline_options = evaluation_context.pipeline_options
     super(_BoundedReadEvaluator, self).__init__(
         evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs)
+        side_inputs, scoped_metrics_container)
 
   def finish_bundle(self):
     assert len(self._outputs) == 1
@@ -213,11 +217,11 @@ class _FlattenEvaluator(_TransformEvaluator):
   """TransformEvaluator for Flatten transform."""
 
   def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs):
+               input_committed_bundle, side_inputs, scoped_metrics_container):
     assert not side_inputs
     super(_FlattenEvaluator, self).__init__(
         evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs)
+        side_inputs, scoped_metrics_container)
 
   def start_bundle(self):
     assert len(self._outputs) == 1
@@ -237,12 +241,12 @@ class _CreateEvaluator(_TransformEvaluator):
   """TransformEvaluator for Create transform."""
 
   def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs):
+               input_committed_bundle, side_inputs, scoped_metrics_container):
     assert not input_committed_bundle
     assert not side_inputs
     super(_CreateEvaluator, self).__init__(
         evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs)
+        side_inputs, scoped_metrics_container)
 
   def start_bundle(self):
     assert len(self._outputs) == 1
@@ -311,10 +315,10 @@ class _ParDoEvaluator(_TransformEvaluator):
   """TransformEvaluator for ParDo transform."""
 
   def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs):
+               input_committed_bundle, side_inputs, scoped_metrics_container):
     super(_ParDoEvaluator, self).__init__(
         evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs)
+        side_inputs, scoped_metrics_container)
 
   def start_bundle(self):
     transform = self._applied_ptransform.transform
@@ -358,12 +362,14 @@ class _ParDoEvaluator(_TransformEvaluator):
           dofn, self._applied_ptransform.full_label)
     else:
       dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label)
-    self.runner = DoFnRunner(dofn, transform.args, transform.kwargs,
-                             self._side_inputs,
-                             self._applied_ptransform.inputs[0].windowing,
-                             tagged_receivers=self._tagged_receivers,
-                             step_name=self._applied_ptransform.full_label,
-                             state=DoFnState(self._counter_factory))
+    self.runner = DoFnRunner(
+        dofn, transform.args, transform.kwargs,
+        self._side_inputs,
+        self._applied_ptransform.inputs[0].windowing,
+        tagged_receivers=self._tagged_receivers,
+        step_name=self._applied_ptransform.full_label,
+        state=DoFnState(self._counter_factory),
+        scoped_metrics_container=self.scoped_metrics_container)
     self.runner.start()
 
   def process_element(self, element):
@@ -391,11 +397,11 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
       self.completed = False
 
   def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs):
+               input_committed_bundle, side_inputs, scoped_metrics_container):
     assert not side_inputs
     super(_GroupByKeyOnlyEvaluator, self).__init__(
         evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs)
+        side_inputs, scoped_metrics_container)
 
   @property
   def _is_final_bundle(self):
@@ -463,11 +469,11 @@ class _CreatePCollectionViewEvaluator(_TransformEvaluator):
   """TransformEvaluator for CreatePCollectionView transform."""
 
   def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs):
+               input_committed_bundle, side_inputs, scoped_metrics_container):
     assert not side_inputs
     super(_CreatePCollectionViewEvaluator, self).__init__(
         evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs)
+        side_inputs, scoped_metrics_container)
 
   @property
   def _is_final_bundle(self):
@@ -509,11 +515,11 @@ class _NativeWriteEvaluator(_TransformEvaluator):
   """TransformEvaluator for _NativeWrite transform."""
 
   def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs):
+               input_committed_bundle, side_inputs, scoped_metrics_container):
     assert not side_inputs
     super(_NativeWriteEvaluator, self).__init__(
         evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs)
+        side_inputs, scoped_metrics_container)
 
     assert applied_ptransform.transform.sink
     self._sink = applied_ptransform.transform.sink

http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 1fd622f..37125c2 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -118,6 +118,7 @@ setuptools.setup(
         '**/*.pyx',
         'apache_beam/coders/coder_impl.py',
         'apache_beam/runners/common.py',
+        'apache_beam/metrics/execution.py',
         'apache_beam/transforms/cy_combiners.py',
         'apache_beam/utils/counters.py',
         'apache_beam/utils/windowed_value.py',


Mime
View raw message