beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Preparing support for Structured Names in Dataflow counters
Date Wed, 09 Aug 2017 23:05:20 GMT
Repository: beam
Updated Branches:
  refs/heads/master 04f5bc6f8 -> 2e1731143


Preparing support for Structured Names in Dataflow counters


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6aadf24f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6aadf24f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6aadf24f

Branch: refs/heads/master
Commit: 6aadf24f595acee5c0fe4de8b224c31fa1977a33
Parents: 04f5bc6
Author: Pablo <pabloem@google.com>
Authored: Thu Aug 3 17:46:09 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Wed Aug 9 16:04:56 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/internal/apiclient.py      | 51 +++++++----
 sdks/python/apache_beam/utils/counters.py       | 92 +++++++++-----------
 2 files changed, 77 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6aadf24f/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index dcaf74e..a1f9301 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -710,10 +710,6 @@ def translate_value(value, metric_update_proto):
   metric_update_proto.integer = to_split_int(value)
 
 
-def translate_scalar(accumulator, metric_update):
-  metric_update.scalar = to_json_value(accumulator.value, with_type=True)
-
-
 def translate_mean(accumulator, metric_update):
   if accumulator.count:
     metric_update.meanSum = to_json_value(accumulator.sum, with_type=True)
@@ -733,20 +729,43 @@ def _use_fnapi(pipeline_options):
 
 
 # To enable a counter on the service, add it to this dictionary.
-metric_translations = {
-    cy_combiners.CountCombineFn: ('sum', translate_scalar),
-    cy_combiners.SumInt64Fn: ('sum', translate_scalar),
-    cy_combiners.MinInt64Fn: ('min', translate_scalar),
-    cy_combiners.MaxInt64Fn: ('max', translate_scalar),
-    cy_combiners.MeanInt64Fn: ('mean', translate_mean),
-    cy_combiners.SumFloatFn: ('sum', translate_scalar),
-    cy_combiners.MinFloatFn: ('min', translate_scalar),
-    cy_combiners.MaxFloatFn: ('max', translate_scalar),
-    cy_combiners.MeanFloatFn: ('mean', translate_mean),
-    cy_combiners.AllCombineFn: ('and', translate_scalar),
-    cy_combiners.AnyCombineFn: ('or', translate_scalar),
+structured_counter_translations = {
+    cy_combiners.CountCombineFn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.SumInt64Fn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MinInt64Fn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.MIN,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MaxInt64Fn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.MAX,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MeanInt64Fn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.MEAN,
+        MetricUpdateTranslators.translate_scalar_mean_int),
+    cy_combiners.SumFloatFn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MinFloatFn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.MIN,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MaxFloatFn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.MAX,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MeanFloatFn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.MEAN,
+        MetricUpdateTranslators.translate_scalar_mean_float),
+    cy_combiners.AllCombineFn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.AND,
+        MetricUpdateTranslators.translate_boolean),
+    cy_combiners.AnyCombineFn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.OR,
+        MetricUpdateTranslators.translate_boolean),
 }
 
+
 counter_translations = {
     cy_combiners.CountCombineFn: (
         dataflow.NameAndKind.KindValueValuesEnum.SUM,

http://git-wip-us.apache.org/repos/asf/beam/blob/6aadf24f/sdks/python/apache_beam/utils/counters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py
index b379461..5d029dc 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -27,6 +27,46 @@ import threading
 from apache_beam.transforms import cy_combiners
 
 
+class CounterName(object):
+  """Naming information for a counter."""
+  SYSTEM = object()
+  USER = object()
+
+  def __init__(self, name, stage_name=None, step_name=None,
+               system_name=None, namespace=None,
+               origin=None, output_index=None):
+    self.name = name
+    self.origin = origin or CounterName.SYSTEM
+    self.namespace = namespace
+    self.stage_name = stage_name
+    self.step_name = step_name
+    self.system_name = system_name
+    self.output_index = output_index
+
+  def __hash__(self):
+    return hash((self.name,
+                 self.origin,
+                 self.namespace,
+                 self.stage_name,
+                 self.step_name,
+                 self.system_name,
+                 self.output_index))
+
+  def __str__(self):
+    return '%s' % self._str_internal()
+
+  def __repr__(self):
+    return '<%s at %s>' % (self._str_internal(), hex(id(self)))
+
+  def _str_internal(self):
+    if self.origin == CounterName.USER:
+      return 'user-%s-%s' % (self.step_name, self.name)
+    elif self.origin == CounterName.SYSTEM and self.output_index:
+      return '%s-out%s-%s' % (self.step_name, self.output_index, self.name)
+    else:
+      return '%s-%s-%s' % (self.stage_name, self.step_name, self.name)
+
+
 class Counter(object):
   """A counter aggregates a series of values.
 
@@ -52,8 +92,8 @@ class Counter(object):
     """Creates a Counter object.
 
     Args:
-      name: the name of this counter.  Typically has three parts:
-        "step-output-counter".
+      name: the name of this counter. It may be a string,
+            or a CounterName object.
       combine_fn: the CombineFn to use for aggregation
     """
     self.name = name
@@ -90,10 +130,6 @@ class AccumulatorCombineFnCounter(Counter):
     self._fast_add_input(value)
 
 
-# Counters that represent Accumulators have names starting with this
-USER_COUNTER_PREFIX = 'user-'
-
-
 class CounterFactory(object):
   """Keeps track of unique counters."""
 
@@ -128,21 +164,6 @@ class CounterFactory(object):
         self.counters[name] = counter
       return counter
 
-  def get_aggregator_counter(self, step_name, aggregator):
-    """Returns an AggregationCounter for this step's aggregator.
-
-    Passing in the same values will return the same counter.
-
-    Args:
-      step_name: the name of this step.
-      aggregator: an Aggregator object.
-    Returns:
-      A new or existing counter.
-    """
-    return self.get_counter(
-        '%s%s-%s' % (USER_COUNTER_PREFIX, step_name, aggregator.name),
-        aggregator.combine_fn)
-
   def get_counters(self):
     """Returns the current set of counters.
 
@@ -154,32 +175,3 @@ class CounterFactory(object):
     """
     with self._lock:
       return self.counters.values()
-
-  def get_aggregator_values(self, aggregator_or_name):
-    """Returns dict of step names to values of the aggregator."""
-    with self._lock:
-      return get_aggregator_values(
-          aggregator_or_name, self.counters, lambda counter: counter.value())
-
-
-def get_aggregator_values(aggregator_or_name, counter_dict,
-                          value_extractor=None):
-  """Extracts the named aggregator value from a set of counters.
-
-  Args:
-    aggregator_or_name: an Aggregator object or the name of one.
-    counter_dict: a dict object of {name: value_wrapper}
-    value_extractor: a function to convert the value_wrapper into a value.
-      If None, no extraction is done and the value is return unchanged.
-
-  Returns:
-    dict of step names to values of the aggregator.
-  """
-  name = aggregator_or_name
-  if value_extractor is None:
-    value_extractor = lambda x: x
-  if not isinstance(aggregator_or_name, basestring):
-    name = aggregator_or_name.name
-    return {n: value_extractor(c) for n, c in counter_dict.iteritems()
-            if n.startswith(USER_COUNTER_PREFIX)
-            and n.endswith('-%s' % name)}


Mime
View raw message