beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Metrics are queriable from Python SDK
Date Fri, 05 May 2017 20:42:56 GMT
Repository: beam
Updated Branches:
  refs/heads/master 0490d6b36 -> da164a3fb


Metrics are queriable from Python SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0afb140d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0afb140d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0afb140d

Branch: refs/heads/master
Commit: 0afb140daaeaabded6a9873d4ab5710a400b2125
Parents: 0490d6b
Author: Pablo <pabloem@google.com>
Authored: Wed May 3 15:50:15 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Fri May 5 13:41:25 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/datastore_wordcount.py    |  13 +-
 sdks/python/apache_beam/examples/wordcount.py   |   7 +-
 sdks/python/apache_beam/metrics/cells.py        |  16 +-
 sdks/python/apache_beam/metrics/execution.py    |   2 +-
 .../runners/dataflow/dataflow_metrics.py        | 114 ++++++++---
 .../runners/dataflow/dataflow_metrics_test.py   | 197 +++++++++++++------
 6 files changed, 253 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 6e7ad76..d694cda 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -88,6 +88,7 @@ class WordExtractingDoFn(beam.DoFn):
     self.empty_line_counter = Metrics.counter('main', 'empty_lines')
     self.word_length_counter = Metrics.counter('main', 'word_lengths')
     self.word_counter = Metrics.counter('main', 'total_words')
+    self.word_lengths_dist = Metrics.distribution('main', 'word_len_dist')
 
   def process(self, element):
     """Returns an iterator over words in contents of Cloud Datastore entity.
@@ -107,6 +108,7 @@ class WordExtractingDoFn(beam.DoFn):
     words = re.findall(r'[A-Za-z\']+', text_line)
     for w in words:
       self.word_length_counter.inc(len(w))
+      self.word_len_dis.update(len(w))
       self.word_counter.inc()
     return words
 
@@ -254,7 +256,16 @@ def run(argv=None):
   if query_result['counters']:
     empty_lines_counter = query_result['counters'][0]
     logging.info('number of empty lines: %d', empty_lines_counter.committed)
-  # TODO(pabloem)(BEAM-1366): Add querying of MEAN metrics.
+  else:
+    logging.warn('unable to retrieve counter metrics from runner')
+
+  word_lengths_filter = MetricsFilter().with_name('word_len_dist')
+  query_result = result.metrics().query(word_lengths_filter)
+  if query_result['distributions']:
+    word_lengths_dist = query_result['distributions'][0]
+    logging.info('average word length: %d', word_lengths_dist.committed.mean)
+  else:
+    logging.warn('unable to retrieve distribution metrics from runner')
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index e93fd2b..adcc4db 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -117,7 +117,12 @@ def run(argv=None):
     if query_result['counters']:
       empty_lines_counter = query_result['counters'][0]
       logging.info('number of empty lines: %d', empty_lines_counter.committed)
-    # TODO(pabloem)(BEAM-1366): Add querying of MEAN metrics.
+
+    word_lengths_filter = MetricsFilter().with_name('word_len_dist')
+    query_result = result.metrics().query(word_lengths_filter)
+    if query_result['distributions']:
+      word_lengths_dist = query_result['distributions'][0]
+      logging.info('average word length: %d', word_lengths_dist.committed.mean)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index c421949..41d24be 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -193,6 +193,13 @@ class DistributionResult(object):
   def __eq__(self, other):
     return self.data == other.data
 
+  def __repr__(self):
+    return '<DistributionResult(sum={}, count={}, min={}, max={})>'.format(
+        self.sum,
+        self.count,
+        self.min,
+        self.max)
+
   @property
   def max(self):
     return self.data.max
@@ -244,10 +251,11 @@ class DistributionData(object):
     return not self.__eq__(other)
 
   def __repr__(self):
-    return '<DistributionData({}, {}, {}, {})>'.format(self.sum,
-                                                       self.count,
-                                                       self.min,
-                                                       self.max)
+    return '<DistributionData(sum={}, count={}, min={}, max={})>'.format(
+        self.sum,
+        self.count,
+        self.min,
+        self.max)
 
   def get_cumulative(self):
     return DistributionData(self.sum, self.count, self.min, self.max)

http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index 887423b..dbd0533 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -96,7 +96,7 @@ class MetricResult(object):
 
   def __str__(self):
     return 'MetricResult(key={}, committed={}, attempted={})'.format(
-        self.key, self.committed, self.attempted)
+        self.key, str(self.committed), str(self.attempted))
 
 
 class _MetricsEnvironment(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index fe0122f..24916fd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -22,14 +22,31 @@ service.
 """
 
 from collections import defaultdict
-from warnings import warn
+import numbers
 
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
 from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricResult
 from apache_beam.metrics.metric import MetricResults
 from apache_beam.metrics.metricbase import MetricName
 
 
+def _get_match(proto, filter_fn):
+  """Finds and returns the first element that matches a query.
+
+  If no element matches the query, it throws ValueError.
+  If more than one element matches the query, it returns only the first.
+  """
+  query = [elm for elm in proto if filter_fn(elm)]
+  if len(query) == 0:
+    raise ValueError('Could not find element')
+  elif len(query) > 1:
+    raise ValueError('Too many matches')
+
+  return query[0]
+
+
 class DataflowMetrics(MetricResults):
   """Implementation of MetricResults class for the Dataflow runner."""
 
@@ -51,18 +68,25 @@ class DataflowMetrics(MetricResults):
     self._cached_metrics = None
     self._job_graph = job_graph
 
+  @staticmethod
+  def _is_counter(metric_result):
+    return isinstance(metric_result.attempted, numbers.Number)
+
+  @staticmethod
+  def _is_distribution(metric_result):
+    return isinstance(metric_result.attempted, DistributionResult)
+
   def _translate_step_name(self, internal_name):
     """Translate between internal step names (e.g. "s1") and user step names."""
     if not self._job_graph:
       raise ValueError('Could not translate the internal step name.')
 
     try:
-      [step] = [step
-                for step in self._job_graph.proto.steps
-                if step.name == internal_name]
-      [user_step_name] = [prop.value.string_value
-                          for prop in step.properties.additionalProperties
-                          if prop.key == 'user_name']
+      step = _get_match(self._job_graph.proto.steps,
+                        lambda x: x.name == internal_name)
+      user_step_name = _get_match(
+          step.properties.additionalProperties,
+          lambda x: x.key == 'user_name').value.string_value
     except ValueError:
       raise ValueError('Could not translate the internal step name.')
     return user_step_name
@@ -77,19 +101,15 @@ class DataflowMetrics(MetricResults):
       #   step name (only happens for unstructured-named metrics).
       # 2. Unable to unpack [step] or [namespace]; which should only happen
       #   for unstructured names.
-      [step] = [prop.value
-                for prop in metric.name.context.additionalProperties
-                if prop.key == 'step']
+      step = _get_match(metric.name.context.additionalProperties,
+                        lambda x: x.key == 'step').value
       step = self._translate_step_name(step)
-      [namespace] = [prop.value
-                     for prop in metric.name.context.additionalProperties
-                     if prop.key == 'namespace']
+      namespace = _get_match(metric.name.context.additionalProperties,
+                             lambda x: x.key == 'namespace').value
       name = metric.name.name
     except ValueError:
-      # An unstructured metric name is "step/namespace/name", but step names
-      # can (and often do) contain slashes. Must only split on the right-most
-      # two slashes, to preserve the full step name.
-      [step, namespace, name] = metric.name.name.rsplit('/', 2)
+      return None
+
     return MetricKey(step, MetricName(namespace, name))
 
   def _populate_metric_results(self, response):
@@ -101,14 +121,17 @@ class DataflowMetrics(MetricResults):
     # Get the tentative/committed versions of every metric together.
     metrics_by_name = defaultdict(lambda: {})
     for metric in user_metrics:
-      if (metric.name.name.endswith('(DIST)') or
-          metric.name.name.endswith('[MIN]') or
+      if (metric.name.name.endswith('[MIN]') or
           metric.name.name.endswith('[MAX]') or
           metric.name.name.endswith('[MEAN]') or
           metric.name.name.endswith('[COUNT]')):
-        warn('Distribution metrics will be ignored in the MetricsResult.query'
-             'method. You can see them in the Dataflow User Interface.')
-        # Distributions are not yet fully supported in this runner
+        # The Dataflow Service presents distribution metrics in two ways:
+        # One way is as a single distribution object with all its fields, and
+        # another way is as four different scalar metrics labeled as [MIN],
+        # [MAX], [COUNT], [MEAN].
+        # TODO(pabloem) remove these when distributions are not being broken up
+        #  in the service.
+        # The second way is only useful for the UI, and should be ignored.
         continue
       is_tentative = [prop
                       for prop in metric.name.context.additionalProperties
@@ -116,22 +139,49 @@ class DataflowMetrics(MetricResults):
       tentative_or_committed = 'tentative' if is_tentative else 'committed'
 
       metric_key = self._get_metric_key(metric)
+      if metric_key is None:
+        continue
       metrics_by_name[metric_key][tentative_or_committed] = metric
 
     # Now we create the MetricResult elements.
     result = []
     for metric_key, metric in metrics_by_name.iteritems():
-      if (metric['tentative'].scalar is None or
-          metric['committed'].scalar is None):
+      attempted = self._get_metric_value(metric['tentative'])
+      committed = self._get_metric_value(metric['committed'])
+      if attempted is None or committed is None:
         continue
-      attempted = metric['tentative'].scalar.integer_value
-      committed = metric['committed'].scalar.integer_value
       result.append(MetricResult(metric_key,
                                  attempted=attempted,
                                  committed=committed))
 
     return result
 
+  def _get_metric_value(self, metric):
+    """Get a metric result object from a MetricUpdate from Dataflow API."""
+    if metric is None:
+      return None
+
+    if metric.scalar is not None:
+      return metric.scalar.integer_value
+    elif metric.distribution is not None:
+      dist_count = _get_match(metric.distribution.object_value.properties,
+                              lambda x: x.key == 'count').value.integer_value
+      dist_min = _get_match(metric.distribution.object_value.properties,
+                            lambda x: x.key == 'min').value.integer_value
+      dist_max = _get_match(metric.distribution.object_value.properties,
+                            lambda x: x.key == 'max').value.integer_value
+      dist_mean = _get_match(metric.distribution.object_value.properties,
+                             lambda x: x.key == 'mean').value.integer_value
+      # Calculating dist_sum with a hack, as distribution sum is not yet
+      # available in the Dataflow API.
+      # TODO(pabloem) Switch to "sum" field once it's available in the API
+      dist_sum = dist_count * dist_mean
+      return DistributionResult(
+          DistributionData(
+              dist_sum, dist_count, dist_min, dist_max))
+    else:
+      return None
+
   def _get_metrics_from_dataflow(self):
     """Return cached metrics or query the dataflow service."""
     try:
@@ -152,7 +202,11 @@ class DataflowMetrics(MetricResults):
 
   def query(self, filter=None):
     response = self._get_metrics_from_dataflow()
-    counters = self._populate_metric_results(response)
-    # TODO(pabloem): Populate distributions once they are available.
-    return {'counters': [c for c in counters if self.matches(filter, c.key)],
-            'distributions': []}
+    metric_results = self._populate_metric_results(response)
+    return {'counters': [elm for elm in metric_results
+                         if self.matches(filter, elm.key)
+                         and DataflowMetrics._is_counter(elm)],
+            'distributions': [elm for elm in metric_results
+                              if self.matches(filter, elm.key)
+                              and DataflowMetrics._is_distribution(elm)],
+            'gauges': []} # Gauges are not currently supported by dataflow

http://git-wip-us.apache.org/repos/asf/beam/blob/0afb140d/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 51a186d..dd3cbe1 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -23,6 +23,8 @@ import unittest
 
 import mock
 
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
 from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricResult
 from apache_beam.metrics.metricbase import MetricName
@@ -43,7 +45,7 @@ class DictToObject(object):
 
 class TestDataflowMetrics(unittest.TestCase):
 
-  STRUCTURED_COUNTER_LIST = {"metrics": [
+  ONLY_COUNTERS_LIST = {"metrics": [
       {"name": {"context":
                 {"additionalProperties": [
                     {"key": "namespace",
@@ -53,10 +55,11 @@ class TestDataflowMetrics(unittest.TestCase):
                     {"key": "tentative",
                      "value": "true"}]
                 },
-                "name": "word_lengths",
+                "name": "words",
                 "origin": "user"
                },
-       "scalar": {"integer_value": 109475},
+       "scalar": {"integer_value": 26185},
+       "distribution": None,
        "updateTime": "2017-03-22T18:47:06.402Z"
       },
       {"name": {"context":
@@ -66,76 +69,137 @@ class TestDataflowMetrics(unittest.TestCase):
                     {"key": "step",
                      "value": "s2"}]
                 },
-                "name": "word_lengths",
+                "name": "words",
                 "origin": "user"
                },
-       "scalar": {"integer_value": 109475},
+       "scalar": {"integer_value": 26181},
+       "distribution": None,
        "updateTime": "2017-03-22T18:47:06.402Z"
       },
-  ]}
-
-  BASIC_COUNTER_LIST = {"metrics": [
       {"name": {"context":
-                {"additionalProperties":[
-                    {"key": "original_name",
-                     "value": "user-split-split/__main__.WordExtractingDoFn/"
-                              "empty_lines_TentativeAggregateValue"},
-                    {"key": "step", "value": "split"}]},
-                "name": "split/__main__.WordExtractingDoFn/empty_lines",
-                "origin": "user"},
+                {"additionalProperties": [
+                    {"key": "namespace",
+                     "value": "__main__.WordExtractingDoFn"},
+                    {"key": "step",
+                     "value": "s2"},
+                    {"key": "tentative",
+                     "value": "true"}]
+                },
+                "name": "empty_lines",
+                "origin": "user"
+               },
        "scalar": {"integer_value": 1080},
-       "updateTime": "2017-02-23T01:13:36.659Z"},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
       {"name": {"context":
                 {"additionalProperties": [
-                    {"key": "original_name",
-                     "value": "user-split-split/__main__.WordExtractingDoFn/"
-                              "empty_lines_TentativeAggregateValue"},
-                    {"key": "step", "value": "split"},
-                    {"key": "tentative", "value": "true"}]},
-                "name": "split/__main__.WordExtractingDoFn/empty_lines",
-                "origin": "user"},
+                    {"key": "namespace",
+                     "value": "__main__.WordExtractingDoFn"},
+                    {"key": "step",
+                     "value": "s2"}]
+                },
+                "name": "empty_lines",
+                "origin": "user"
+               },
        "scalar": {"integer_value": 1080},
-       "updateTime": "2017-02-23T01:13:36.659Z"},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+  ]}
+  STRUCTURED_COUNTER_LIST = {"metrics": [
       {"name": {"context":
                 {"additionalProperties": [
-                    {"key": "original_name",
-                     "value": "user-split-split/__main__.WordExtractingDoFn/"
-                              "words_TentativeAggregateValue"},
-                    {"key": "step", "value": "split"}]},
-                "name": "longstepname/split/__main__.WordExtractingDoFn/words",
-                "origin": "user"},
-       "scalar": {"integer_value": 26181},
-       "updateTime": "2017-02-23T01:13:36.659Z"},
+                    {"key": "namespace",
+                     "value": "__main__.WordExtractingDoFn"},
+                    {"key": "step",
+                     "value": "s2"},
+                    {"key": "tentative",
+                     "value": "true"}]
+                },
+                "name": "word_lengths",
+                "origin": "user"
+               },
+       "scalar": {"integer_value": 109475},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
       {"name": {"context":
                 {"additionalProperties": [
-                    {"key": "original_name",
-                     "value": "user-split-longstepname/split/"
-                              "__main__.WordExtractingDoFn/"
-                              "words_TentativeAggregateValue"},
-                    {"key": "step", "value": "split"},
-                    {"key": "tentative", "value": "true"}]},
-                "name": "longstepname/split/__main__.WordExtractingDoFn/words",
-                "origin": "user"},
-       "scalar": {"integer_value": 26185},
-       "updateTime": "2017-02-23T01:13:36.659Z"},
+                    {"key": "namespace",
+                     "value": "__main__.WordExtractingDoFn"},
+                    {"key": "step",
+                     "value": "s2"}]
+                },
+                "name": "word_lengths",
+                "origin": "user"
+               },
+       "scalar": {"integer_value": 109475},
+       "distribution": None,
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
       {"name": {"context":
                 {"additionalProperties": [
-                    {"key": "original_name",
-                     "value": "user-split-split/__main__.WordExtractingDoFn/"
-                              "secretdistribution(DIST)"},
-                    {"key": "step", "value": "split"},
-                    {"key": "tentative", "value": "true"}]},
-                "name":
-                "split/__main__.WordExtractingDoFn/secretdistribution(DIST)",
-                "origin": "user"},
-       "scalar": {"integer_value": 15},
-       "updateTime": "2017-02-23T01:13:36.659Z"}
+                    {"key": "namespace",
+                     "value": "__main__.WordExtractingDoFn"},
+                    {"key": "step",
+                     "value": "s2"},
+                    {"key": "tentative",
+                     "value": "true"}]
+                },
+                "name": "word_length_dist",
+                "origin": "user"
+               },
+       "scalar": None,
+       "distribution": {
+           "object_value": {
+               "properties": [
+                   {"key": "min", "value":
+                    {"integer_value": 2}},
+                   {"key": "max", "value":
+                    {"integer_value": 16}},
+                   {"key": "count", "value":
+                    {"integer_value": 2}},
+                   {"key": "mean", "value":
+                    {"integer_value": 9}},
+                   {"key": "sum", "value":
+                    {"integer_value": 18}},]
+           }
+       },
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "namespace",
+                     "value": "__main__.WordExtractingDoFn"},
+                    {"key": "step",
+                     "value": "s2"}]
+                },
+                "name": "word_length_dist",
+                "origin": "user"
+               },
+       "scalar": None,
+       "distribution": {
+           "object_value": {
+               "properties": [
+                   {"key": "min", "value":
+                    {"integer_value": 2}},
+                   {"key": "max", "value":
+                    {"integer_value": 16}},
+                   {"key": "count", "value":
+                    {"integer_value": 2}},
+                   {"key": "mean", "value":
+                    {"integer_value": 9}},
+                   {"key": "sum", "value":
+                    {"integer_value": 18}},
+               ]
+           }
+       },
+       "updateTime": "2017-03-22T18:47:06.402Z"
+      },
   ]}
 
   def setup_mock_client_result(self, counter_list=None):
-    if counter_list is None:
-      counter_list = self.BASIC_COUNTER_LIST
-
     mock_client = mock.Mock()
     mock_query_result = DictToObject(counter_list)
     mock_client.get_job_metrics.return_value = mock_query_result
@@ -145,7 +209,8 @@ class TestDataflowMetrics(unittest.TestCase):
     return mock_client, mock_job_result
 
   def test_cache_functions(self):
-    mock_client, mock_job_result = self.setup_mock_client_result()
+    mock_client, mock_job_result = self.setup_mock_client_result(
+        self.STRUCTURED_COUNTER_LIST)
     dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
 
     # At first creation, we should always query dataflow.
@@ -160,7 +225,7 @@ class TestDataflowMetrics(unittest.TestCase):
     dm.query()
     self.assertTrue(dm._cached_metrics)
 
-  def test_query_structured_counters(self):
+  def test_query_structured_metrics(self):
     mock_client, mock_job_result = self.setup_mock_client_result(
         self.STRUCTURED_COUNTER_LIST)
     dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
@@ -175,9 +240,23 @@ class TestDataflowMetrics(unittest.TestCase):
         ]
     self.assertEqual(query_result['counters'], expected_counters)
 
+    expected_distributions = [
+        MetricResult(
+            MetricKey('split',
+                      MetricName('__main__.WordExtractingDoFn',
+                                 'word_length_dist')),
+            DistributionResult(DistributionData(
+                18, 2, 2, 16)),
+            DistributionResult(DistributionData(
+                18, 2, 2, 16))),
+        ]
+    self.assertEqual(query_result['distributions'], expected_distributions)
+
   def test_query_counters(self):
-    mock_client, mock_job_result = self.setup_mock_client_result()
+    mock_client, mock_job_result = self.setup_mock_client_result(
+        self.ONLY_COUNTERS_LIST)
     dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
+    dm._translate_step_name = types.MethodType(lambda self, x: 'split', dm)
     query_result = dm.query()
     expected_counters = [
         MetricResult(
@@ -185,7 +264,7 @@ class TestDataflowMetrics(unittest.TestCase):
                       MetricName('__main__.WordExtractingDoFn', 'empty_lines')),
             1080, 1080),
         MetricResult(
-            MetricKey('longstepname/split',
+            MetricKey('split',
                       MetricName('__main__.WordExtractingDoFn', 'words')),
             26181, 26185),
         ]


Mime
View raw message