Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 68D09200C0C for ; Tue, 31 Jan 2017 00:03:16 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 678DD160B6C; Mon, 30 Jan 2017 23:03:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 295F1160B64 for ; Tue, 31 Jan 2017 00:03:15 +0100 (CET) Received: (qmail 25324 invoked by uid 500); 30 Jan 2017 23:03:11 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 23672 invoked by uid 99); 30 Jan 2017 23:03:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Jan 2017 23:03:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 540F9E01C8; Mon, 30 Jan 2017 23:03:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.apache.org Date: Mon, 30 Jan 2017 23:03:36 -0000 Message-Id: <860823f5e62f482d9784dcb164f3311a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] beam git commit: Refactoring metrics infrastructure archived-at: Mon, 30 Jan 2017 23:03:16 -0000 Refactoring metrics infrastructure Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b148f5cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b148f5cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b148f5cc Branch: refs/heads/master Commit: b148f5cc9f3e414b9cd1f605b25d50e21f626b7a Parents: e3849af Author: Pablo Authored: Mon Jan 23 17:50:21 2017 -0800 Committer: Robert Bradshaw Committed: Thu Jan 26 15:28:49 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/metrics/execution.pxd | 31 +++++++++ sdks/python/apache_beam/metrics/execution.py | 70 ++++++++++++-------- sdks/python/apache_beam/runners/common.pxd | 2 + sdks/python/apache_beam/runners/common.py | 11 ++- .../apache_beam/runners/direct/executor.py | 12 ++-- .../runners/direct/transform_evaluator.py | 54 ++++++++------- sdks/python/setup.py | 1 + 7 files changed, 125 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/execution.pxd b/sdks/python/apache_beam/metrics/execution.pxd new file mode 100644 index 0000000..d89004f --- /dev/null +++ b/sdks/python/apache_beam/metrics/execution.pxd @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cimport cython + + +cdef class MetricsContainer(object): + cdef object step_name + cdef public object counters + cdef public object distributions + + +cdef class ScopedMetricsContainer(object): + cpdef enter(self) + cpdef exit(self) + cdef list _stack + cdef MetricsContainer _container http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 8f04b7b..3ba1735 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -98,36 +98,49 @@ class MetricResult(object): self.key, self.committed, self.attempted) -class MetricsEnvironment(object): +class _MetricsEnvironment(object): """Holds the MetricsContainer for every thread and other metric information. This class is not meant to be instantiated, instead being used to keep track of global state. """ - METRICS_SUPPORTED = False - _METRICS_SUPPORTED_LOCK = threading.Lock() - - PER_THREAD = threading.local() + def __init__(self): + self.METRICS_SUPPORTED = False + self._METRICS_SUPPORTED_LOCK = threading.Lock() + self.PER_THREAD = threading.local() + self.set_container_stack() + + def set_container_stack(self): + if not hasattr(self.PER_THREAD, 'container'): + self.PER_THREAD.container = [] + + def container_stack(self): + self.set_container_stack() + return self.PER_THREAD.container + + def set_metrics_supported(self, supported): + self.set_container_stack() + with self._METRICS_SUPPORTED_LOCK: + self.METRICS_SUPPORTED = supported + + def current_container(self): + self.set_container_stack() + index = len(self.PER_THREAD.container) - 1 + if index < 0: + return None + else: + return self.PER_THREAD.container[index] - @classmethod - def set_metrics_supported(cls, supported): - with cls._METRICS_SUPPORTED_LOCK: - cls.METRICS_SUPPORTED = supported + def set_current_container(self, container): + self.set_container_stack() + self.PER_THREAD.container.append(container) - @classmethod - def current_container(cls): - try: - return cls.PER_THREAD.container - except AttributeError: - return None + def unset_current_container(self): + self.set_container_stack() + self.PER_THREAD.container.pop() - @classmethod - def set_current_container(cls, container): - cls.PER_THREAD.container = container - @classmethod - def unset_current_container(cls): - del cls.PER_THREAD.container +MetricsEnvironment = _MetricsEnvironment() class MetricsContainer(object): @@ -180,16 +193,21 @@ class MetricsContainer(object): class ScopedMetricsContainer(object): - def __init__(self, container): - self._old_container = MetricsEnvironment.current_container() + def __init__(self, container=None): + self._stack = MetricsEnvironment.container_stack() self._container = container + def enter(self): + self._stack.append(self._container) + + def exit(self): + self._stack.pop() + def __enter__(self): - MetricsEnvironment.set_current_container(self._container) - return self._container + self.enter() def __exit__(self, type, value, traceback): - MetricsEnvironment.set_current_container(self._old_container) + self.exit() class MetricUpdates(object): http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 10d1f96..f41b313 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -18,6 +18,7 @@ cimport cython from apache_beam.utils.windowed_value cimport WindowedValue +from apache_beam.metrics.execution cimport ScopedMetricsContainer cdef type SideOutputValue, TimestampedValue @@ -40,6 +41,7 @@ cdef class DoFnRunner(Receiver): cdef object args cdef dict kwargs cdef object side_inputs + cdef ScopedMetricsContainer scoped_metrics_container cdef bint has_windowed_side_inputs cdef Receiver main_receivers http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 3741582..cb47513 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -22,6 +22,7 @@ import sys from apache_beam.internal import util +from apache_beam.metrics.execution import ScopedMetricsContainer from apache_beam.pvalue import SideOutputValue from apache_beam.transforms import core from apache_beam.transforms import window @@ -69,7 +70,8 @@ class DoFnRunner(Receiver): logging_context=None, # Preferred alternative to context # TODO(robertwb): Remove once all runners are updated. - state=None): + state=None, + scoped_metrics_container=None): """Initializes a DoFnRunner. Args: @@ -84,10 +86,13 @@ class DoFnRunner(Receiver): step_name: the name of this step logging_context: a LoggingContext object state: handle for accessing DoFn state + scoped_metrics_container: Context switcher for metrics container """ self.step_name = step_name self.window_fn = windowing.windowfn self.tagged_receivers = tagged_receivers + self.scoped_metrics_container = (scoped_metrics_container + or ScopedMetricsContainer()) global_window = window.GlobalWindow() @@ -236,6 +241,7 @@ class DoFnRunner(Receiver): def _invoke_bundle_method(self, method): try: self.logging_context.enter() + self.scoped_metrics_container.enter() self.context.set_element(None) f = getattr(self.dofn, method) @@ -251,6 +257,7 @@ class DoFnRunner(Receiver): except BaseException as exn: self.reraise_augmented(exn) finally: + self.scoped_metrics_container.exit() self.logging_context.exit() def start(self): @@ -262,6 +269,7 @@ class DoFnRunner(Receiver): def process(self, element): try: self.logging_context.enter() + self.scoped_metrics_container.enter() if self.is_new_dofn: self.new_dofn_process(element) else: @@ -269,6 +277,7 @@ class DoFnRunner(Receiver): except BaseException as exn: self.reraise_augmented(exn) finally: + self.scoped_metrics_container.exit() self.logging_context.exit() def reraise_augmented(self, exn): http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 7e404f8..2d4a8bd 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -27,7 +27,7 @@ import traceback from weakref import WeakValueDictionary from apache_beam.metrics.execution import MetricsContainer -from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.metrics.execution import ScopedMetricsContainer class ExecutorService(object): @@ -270,7 +270,7 @@ class TransformExecutor(ExecutorService.CallableTask): self._call_count += 1 assert self._call_count <= (1 + len(self._applied_transform.side_inputs)) metrics_container = MetricsContainer(self._applied_transform.full_label) - MetricsEnvironment.set_current_container(metrics_container) + scoped_metrics_container = ScopedMetricsContainer(metrics_container) for side_input in self._applied_transform.side_inputs: if side_input not in self._side_input_values: @@ -288,14 +288,16 @@ class TransformExecutor(ExecutorService.CallableTask): try: evaluator = self._transform_evaluator_registry.for_application( - self._applied_transform, self._input_bundle, side_input_values) + self._applied_transform, self._input_bundle, + side_input_values, scoped_metrics_container) if self._input_bundle: for value in self._input_bundle.elements: evaluator.process_element(value) - result = evaluator.finish_bundle() - result.metric_updates = metrics_container.get_cumulative() + with scoped_metrics_container: + result = evaluator.finish_bundle() + result.metric_updates = metrics_container.get_cumulative() if self._evaluation_context.has_cache: for uncommitted_bundle in result.output_bundles: http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index e8d8c4c..13c87c5 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -61,7 +61,8 @@ class TransformEvaluatorRegistry(object): } def for_application( - self, applied_ptransform, input_committed_bundle, side_inputs): + self, applied_ptransform, input_committed_bundle, + side_inputs, scoped_metrics_container): """Returns a TransformEvaluator suitable for processing given inputs.""" assert applied_ptransform assert bool(applied_ptransform.side_inputs) == bool(side_inputs) @@ -78,7 +79,8 @@ class TransformEvaluatorRegistry(object): 'Execution of [%s] not implemented in runner %s.' % ( type(applied_ptransform.transform), self)) return evaluator(self._evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs) + input_committed_bundle, side_inputs, + scoped_metrics_container) def should_execute_serially(self, applied_ptransform): """Returns True if this applied_ptransform should run one bundle at a time. @@ -108,7 +110,7 @@ class _TransformEvaluator(object): """An evaluator of a specific application of a transform.""" def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs): + input_committed_bundle, side_inputs, scoped_metrics_container): self._evaluation_context = evaluation_context self._applied_ptransform = applied_ptransform self._input_committed_bundle = input_committed_bundle @@ -116,7 +118,9 @@ class _TransformEvaluator(object): self._expand_outputs() self._execution_context = evaluation_context.get_execution_context( applied_ptransform) - self.start_bundle() + self.scoped_metrics_container = scoped_metrics_container + with scoped_metrics_container: + self.start_bundle() def _expand_outputs(self): outputs = set() @@ -176,14 +180,14 @@ class _BoundedReadEvaluator(_TransformEvaluator): MAX_ELEMENT_PER_BUNDLE = 100 def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs): + input_committed_bundle, side_inputs, scoped_metrics_container): assert not input_committed_bundle assert not side_inputs self._source = applied_ptransform.transform.source self._source.pipeline_options = evaluation_context.pipeline_options super(_BoundedReadEvaluator, self).__init__( evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs) + side_inputs, scoped_metrics_container) def finish_bundle(self): assert len(self._outputs) == 1 @@ -213,11 +217,11 @@ class _FlattenEvaluator(_TransformEvaluator): """TransformEvaluator for Flatten transform.""" def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs): + input_committed_bundle, side_inputs, scoped_metrics_container): assert not side_inputs super(_FlattenEvaluator, self).__init__( evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs) + side_inputs, scoped_metrics_container) def start_bundle(self): assert len(self._outputs) == 1 @@ -237,12 +241,12 @@ class _CreateEvaluator(_TransformEvaluator): """TransformEvaluator for Create transform.""" def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs): + input_committed_bundle, side_inputs, scoped_metrics_container): assert not input_committed_bundle assert not side_inputs super(_CreateEvaluator, self).__init__( evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs) + side_inputs, scoped_metrics_container) def start_bundle(self): assert len(self._outputs) == 1 @@ -311,10 +315,10 @@ class _ParDoEvaluator(_TransformEvaluator): """TransformEvaluator for ParDo transform.""" def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs): + input_committed_bundle, side_inputs, scoped_metrics_container): super(_ParDoEvaluator, self).__init__( evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs) + side_inputs, scoped_metrics_container) def start_bundle(self): transform = self._applied_ptransform.transform @@ -358,12 +362,14 @@ class _ParDoEvaluator(_TransformEvaluator): dofn, self._applied_ptransform.full_label) else: dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label) - self.runner = DoFnRunner(dofn, transform.args, transform.kwargs, - self._side_inputs, - self._applied_ptransform.inputs[0].windowing, - tagged_receivers=self._tagged_receivers, - step_name=self._applied_ptransform.full_label, - state=DoFnState(self._counter_factory)) + self.runner = DoFnRunner( + dofn, transform.args, transform.kwargs, + self._side_inputs, + self._applied_ptransform.inputs[0].windowing, + tagged_receivers=self._tagged_receivers, + step_name=self._applied_ptransform.full_label, + state=DoFnState(self._counter_factory), + scoped_metrics_container=self.scoped_metrics_container) self.runner.start() def process_element(self, element): @@ -391,11 +397,11 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): self.completed = False def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs): + input_committed_bundle, side_inputs, scoped_metrics_container): assert not side_inputs super(_GroupByKeyOnlyEvaluator, self).__init__( evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs) + side_inputs, scoped_metrics_container) @property def _is_final_bundle(self): @@ -463,11 +469,11 @@ class _CreatePCollectionViewEvaluator(_TransformEvaluator): """TransformEvaluator for CreatePCollectionView transform.""" def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs): + input_committed_bundle, side_inputs, scoped_metrics_container): assert not side_inputs super(_CreatePCollectionViewEvaluator, self).__init__( evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs) + side_inputs, scoped_metrics_container) @property def _is_final_bundle(self): @@ -509,11 +515,11 @@ class _NativeWriteEvaluator(_TransformEvaluator): """TransformEvaluator for _NativeWrite transform.""" def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs): + input_committed_bundle, side_inputs, scoped_metrics_container): assert not side_inputs super(_NativeWriteEvaluator, self).__init__( evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs) + side_inputs, scoped_metrics_container) assert applied_ptransform.transform.sink self._sink = applied_ptransform.transform.sink http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 1fd622f..37125c2 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -118,6 +118,7 @@ setuptools.setup( '**/*.pyx', 'apache_beam/coders/coder_impl.py', 'apache_beam/runners/common.py', + 'apache_beam/metrics/execution.py', 'apache_beam/transforms/cy_combiners.py', 'apache_beam/utils/counters.py', 'apache_beam/utils/windowed_value.py',