beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Making metrics queriable in Python SDK.
Date Sat, 25 Feb 2017 01:54:59 GMT
Repository: beam
Updated Branches:
  refs/heads/master 639f9f4b7 -> 415546eda


Making metrics queriable in Python SDK.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5b8cb1e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5b8cb1e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5b8cb1e

Branch: refs/heads/master
Commit: a5b8cb1e11deee7fe7ed152a4718a1f971f58eab
Parents: 639f9f4
Author: Pablo <pabloem@google.com>
Authored: Wed Feb 15 15:16:12 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Feb 24 17:38:26 2017 -0800

----------------------------------------------------------------------
 .../examples/cookbook/datastore_wordcount.py    |   9 +-
 sdks/python/apache_beam/examples/wordcount.py   |  11 +-
 sdks/python/apache_beam/metrics/metric.py       |   2 +-
 sdks/python/apache_beam/pipeline_test.py        |  17 +++
 .../runners/dataflow/dataflow_metrics.py        |  80 +++++++++++-
 .../runners/dataflow/dataflow_metrics_test.py   | 128 +++++++++++++++++++
 .../runners/dataflow/dataflow_runner.py         |   9 +-
 .../runners/dataflow/dataflow_runner_test.py    |   5 -
 .../runners/dataflow/internal/apiclient.py      |  13 ++
 9 files changed, 262 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index bb5d5c0..9fa10d3 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -75,6 +75,7 @@ from apache_beam.io import ReadFromText
 from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
 from apache_beam.metrics import Metrics
+from apache_beam.metrics.metric import MetricsFilter
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
@@ -247,8 +248,12 @@ def run(argv=None):
   result = read_from_datastore(gcloud_options.project, known_args,
                                pipeline_options)
 
-  result.metrics().query()
-  #TODO(pabloem)(BEAM-1366) Fix these once metrics are 100% queriable.
+  empty_lines_filter = MetricsFilter().with_name('empty_lines')
+  query_result = result.metrics().query(empty_lines_filter)
+  if query_result['counters']:
+    empty_lines_counter = query_result['counters'][0]
+    logging.info('number of empty lines: %d', empty_lines_counter.committed)
+  # TODO(pabloem)(BEAM-1366): Add querying of MEAN metrics.
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 4d482b0..50c0328 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -27,6 +27,7 @@ import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
 from apache_beam.metrics import Metrics
+from apache_beam.metrics.metric import MetricsFilter
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -38,6 +39,8 @@ class WordExtractingDoFn(beam.DoFn):
     super(WordExtractingDoFn, self).__init__()
     self.words_counter = Metrics.counter(self.__class__, 'words')
     self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
+    self.word_lengths_dist = Metrics.distribution(
+        self.__class__, 'word_len_dist')
     self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')
 
   def process(self, element):
@@ -58,6 +61,7 @@ class WordExtractingDoFn(beam.DoFn):
     for w in words:
       self.words_counter.inc()
       self.word_lengths_counter.inc(len(w))
+      self.word_lengths_dist.update(len(w))
     return words
 
 
@@ -100,7 +104,12 @@ def run(argv=None):
   # Actually run the pipeline (all operations above are deferred).
   result = p.run()
   result.wait_until_finish()
-  #TODO(pabloem)(BEAM-1366) Add querying of metrics once they are queriable.
+  empty_lines_filter = MetricsFilter().with_name('empty_lines')
+  query_result = result.metrics().query(empty_lines_filter)
+  if query_result['counters']:
+    empty_lines_counter = query_result['counters'][0]
+    logging.info('number of empty lines: %d', empty_lines_counter.committed)
+  # TODO(pabloem)(BEAM-1366): Add querying of MEAN metrics.
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index 13ca77b..a0e3cba 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -105,7 +105,7 @@ class MetricResults(object):
     else:
       return False
 
-  def query(self, filter):
+  def query(self, filter=None):
     raise NotImplementedError
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 6c2512f..a08edf8 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -21,6 +21,9 @@ import logging
 import platform
 import unittest
 
+from nose.plugins.attrib import attr
+
+from apache_beam.metrics import Metrics
 from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
@@ -50,6 +53,7 @@ class FakeSource(NativeSource):
 
     def __init__(self, vals):
       self._vals = vals
+      self._output_counter = Metrics.counter('main', 'outputs')
 
     def __enter__(self):
       return self
@@ -59,6 +63,7 @@ class FakeSource(NativeSource):
 
     def __iter__(self):
       for v in self._vals:
+        self._output_counter.inc()
         yield v
 
   def __init__(self, vals):
@@ -132,6 +137,18 @@ class PipelineTest(unittest.TestCase):
     assert_that(pcoll, equal_to([[1, 2, 3]]))
     pipeline.run()
 
+  @attr('ValidatesRunner')
+  def test_metrics_in_source(self):
+    pipeline = TestPipeline()
+    pcoll = pipeline | Read(FakeSource([1, 2, 3, 4, 5, 6]))
+    assert_that(pcoll, equal_to([1, 2, 3, 4, 5, 6]))
+    res = pipeline.run()
+    metric_results = res.metrics().query()
+    outputs_counter = metric_results['counters'][0]
+    self.assertEqual(outputs_counter.key.step, 'Read')
+    self.assertEqual(outputs_counter.key.metric.name, 'outputs')
+    self.assertEqual(outputs_counter.committed, 6)
+
   def test_read(self):
     pipeline = TestPipeline()
     pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))

http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 1d86f2f..db5a2bc 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -21,13 +21,91 @@ responding to queries of current metrics by going to the dataflow
 service.
 """
 
+from collections import defaultdict
+from warnings import warn
+
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.execution import MetricResult
 from apache_beam.metrics.metric import MetricResults
+from apache_beam.metrics.metricbase import MetricName
 
 
 # TODO(pabloem)(JIRA-1381) Implement this once metrics are queriable from
 # dataflow service
 class DataflowMetrics(MetricResults):
+  """Implementation of MetricResults class for the Dataflow runner."""
+
+  def __init__(self, dataflow_client=None, job_result=None):
+    """Initialize the Dataflow metrics object.
+
+    Args:
+      dataflow_client: apiclient.DataflowApplicationClient to interact with the
+        dataflow service.
+      job_result: DataflowPipelineResult with the state and id information of
+        the job
+    """
+    super(DataflowMetrics, self).__init__()
+    self._dataflow_client = dataflow_client
+    self.job_result = job_result
+    self._queried_after_termination = False
+    self._cached_metrics = None
+
+  def _populate_metric_results(self, response):
+    """Take a list of metrics, and convert it to a list of MetricResult."""
+    user_metrics = [metric
+                    for metric in response.metrics
+                    if metric.name.origin == 'user']
+
+    # Get the tentative/committed versions of every metric together.
+    metrics_by_name = defaultdict(lambda: {})
+    for metric in user_metrics:
+      tentative = [prop
+                   for prop in metric.name.context.additionalProperties
+                   if prop.key == 'tentative' and prop.value == 'true']
+      key = 'tentative' if tentative else 'committed'
+      metrics_by_name[metric.name.name][key] = metric
+
+    # Now we create the MetricResult elements.
+    result = []
+    for name, metric in metrics_by_name.iteritems():
+      if (name.endswith('(DIST)') or
+          name.endswith('[MIN]') or
+          name.endswith('[MAX]') or
+          name.endswith('[MEAN]') or
+          name.endswith('[COUNT]')):
+        warn('Distribution metrics will be ignored in the MetricsResult.query'
+             'method. You can see them in the Dataflow User Interface.')
+        # Distributions are not yet fully supported in this runner
+        continue
+      [step, namespace, name] = name.split('/')
+      key = MetricKey(step, MetricName(namespace, name))
+      attempted = metric['tentative'].scalar.integer_value
+      committed = metric['committed'].scalar.integer_value
+      result.append(MetricResult(key, attempted=attempted, committed=committed))
+
+    return result
+
+  def _get_metrics_from_dataflow(self):
+    """Return cached metrics or query the dataflow service."""
+    try:
+      job_id = self.job_result.job_id()
+    except AttributeError:
+      job_id = None
+    if not job_id:
+      raise ValueError('Can not query metrics. Job id is unknown.')
+
+    if self._cached_metrics:
+      return self._cached_metrics
+
+    job_metrics = self._dataflow_client.get_job_metrics(job_id)
+    # If the job has terminated, metrics will not change and we can cache them.
+    if self.job_result._is_in_terminal_state():
+      self._cached_metrics = job_metrics
+    return job_metrics
 
   def query(self, filter=None):
-    return {'counters': [],
+    response = self._get_metrics_from_dataflow()
+    counters = self._populate_metric_results(response)
+    # TODO(pabloem): Populate distributions once they are available.
+    return {'counters': [c for c in counters if self.matches(filter, c.key)],
             'distributions': []}

http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 5475ac7..8d18fae 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -18,3 +18,131 @@
 Tests corresponding to the DataflowRunner implementation of MetricsResult,
 the DataflowMetrics class.
 """
+import unittest
+
+import mock
+
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.execution import MetricResult
+from apache_beam.metrics.metricbase import MetricName
+from apache_beam.runners.dataflow import dataflow_metrics
+
+
+class DictToObject(object):
+  """Translate from a dict(list()) structure to an object structure"""
+  def __init__(self, data):
+    for name, value in data.iteritems():
+      setattr(self, name, self._wrap(value))
+
+  def _wrap(self, value):
+    if isinstance(value, (tuple, list, set, frozenset)):
+      return type(value)([self._wrap(v) for v in value])
+    else:
+      return DictToObject(value) if isinstance(value, dict) else value
+
+
+class TestDataflowMetrics(unittest.TestCase):
+
+  BASIC_COUNTER_LIST = {"metrics": [
+      {"name": {"context":
+                {"additionalProperties":[
+                    {"key": "original_name",
+                     "value": "user-split-split/__main__.WordExtractingDoFn/"
+                              "empty_lines_TentativeAggregateValue"},
+                    {"key": "step", "value": "split"}]},
+                "name": "split/__main__.WordExtractingDoFn/empty_lines",
+                "origin": "user"},
+       "scalar": {"integer_value": 1080},
+       "updateTime": "2017-02-23T01:13:36.659Z"},
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "original_name",
+                     "value": "user-split-split/__main__.WordExtractingDoFn/"
+                              "empty_lines_TentativeAggregateValue"},
+                    {"key": "step", "value": "split"},
+                    {"key": "tentative", "value": "true"}]},
+                "name": "split/__main__.WordExtractingDoFn/empty_lines",
+                "origin": "user"},
+       "scalar": {"integer_value": 1080},
+       "updateTime": "2017-02-23T01:13:36.659Z"},
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "original_name",
+                     "value": "user-split-split/__main__.WordExtractingDoFn/"
+                              "words_TentativeAggregateValue"},
+                    {"key": "step", "value": "split"}]},
+                "name": "split/__main__.WordExtractingDoFn/words",
+                "origin": "user"},
+       "scalar": {"integer_value": 26181},
+       "updateTime": "2017-02-23T01:13:36.659Z"},
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "original_name",
+                     "value": "user-split-split/__main__.WordExtractingDoFn/"
+                              "words_TentativeAggregateValue"},
+                    {"key": "step", "value": "split"},
+                    {"key": "tentative", "value": "true"}]},
+                "name": "split/__main__.WordExtractingDoFn/words",
+                "origin": "user"},
+       "scalar": {"integer_value": 26185},
+       "updateTime": "2017-02-23T01:13:36.659Z"},
+      {"name": {"context":
+                {"additionalProperties": [
+                    {"key": "original_name",
+                     "value": "user-split-split/__main__.WordExtractingDoFn/"
+                              "secretdistribution(DIST)"},
+                    {"key": "step", "value": "split"},
+                    {"key": "tentative", "value": "true"}]},
+                "name":
+                "split/__main__.WordExtractingDoFn/secretdistribution(DIST)",
+                "origin": "user"},
+       "scalar": {"integer_value": 15},
+       "updateTime": "2017-02-23T01:13:36.659Z"}
+  ]}
+
+  def setup_mock_client_result(self):
+    mock_client = mock.Mock()
+    mock_query_result = DictToObject(self.BASIC_COUNTER_LIST)
+    mock_client.get_job_metrics.return_value = mock_query_result
+    mock_job_result = mock.Mock()
+    mock_job_result.job_id.return_value = 1
+    mock_job_result._is_in_terminal_state.return_value = False
+    return mock_client, mock_job_result
+
+  def test_cache_functions(self):
+    mock_client, mock_job_result = self.setup_mock_client_result()
+    dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
+
+    # At first creation, we should always query dataflow.
+    self.assertTrue(dm._cached_metrics is None)
+
+    # Right after querying, we still query again.
+    dm.query()
+    self.assertTrue(dm._cached_metrics is None)
+
+    # The job has ended. The query should not run again after this.
+    mock_job_result._is_in_terminal_state.return_value = True
+    dm.query()
+    self.assertTrue(dm._cached_metrics)
+
+  def test_query_counters(self):
+    mock_client, mock_job_result = self.setup_mock_client_result()
+    dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result)
+    query_result = dm.query()
+    expected_counters = [
+        MetricResult(
+            MetricKey('split',
+                      MetricName('__main__.WordExtractingDoFn', 'empty_lines')),
+            1080, 1080),
+        MetricResult(
+            MetricKey('split',
+                      MetricName('__main__.WordExtractingDoFn', 'words')),
+            26181, 26185),
+        ]
+    self.assertEqual(sorted(query_result['counters'],
+                            key=lambda x: x.key.metric.name),
+                     sorted(expected_counters,
+                            key=lambda x: x.key.metric.name))
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ebc9024..25f2fd4 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -171,9 +171,13 @@ class DataflowRunner(PipelineRunner):
         pipeline.options, job_version)
 
     # Create the job
-    return DataflowPipelineResult(
+    result = DataflowPipelineResult(
         self.dataflow_client.create_job(self.job), self)
 
+    self._metrics = DataflowMetrics(self.dataflow_client, result)
+    result.metric_results = self._metrics
+    return result
+
   def _get_typehint_based_encoding(self, typehint, window_coder):
     """Returns an encoding based on a typehint object."""
     return self._get_cloud_encoding(self._get_coder(typehint,
@@ -634,12 +638,13 @@ class DataflowPipelineResult(PipelineResult):
     """Job is a Job message from the Dataflow API."""
     self._job = job
     self._runner = runner
+    self.metric_results = None
 
   def job_id(self):
     return self._job.id
 
   def metrics(self):
-    return DataflowMetrics()
+    return self.metric_results
 
   @property
   def has_job(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 4a0815a..cc5928a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -55,11 +55,6 @@ class DataflowRunnerTest(unittest.TestCase):
       '--temp_location=/dev/null',
       '--no_auth=True']
 
-  def test_dataflow_runner_has_metrics(self):
-    df_result = DataflowPipelineResult('somejob', 'somerunner')
-    self.assertTrue(df_result.metrics())
-    self.assertTrue(df_result.metrics().query())
-
   @mock.patch('time.sleep', return_value=None)
   def test_wait_until_finish(self, patched_time_sleep):
     values_enum = dataflow_api.Job.CurrentStateValueValuesEnum

http://git-wip-us.apache.org/repos/asf/beam/blob/a5b8cb1e/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index d4fa3ce..e980b14 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -436,6 +436,19 @@ class DataflowApplicationClient(object):
     # TODO(silviuc): Remove the debug logging eventually.
     logging.info('JOB: %s', job)
 
+  @retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3)
+  def get_job_metrics(self, job_id):
+    request = dataflow.DataflowProjectsJobsGetMetricsRequest()
+    request.jobId = job_id
+    request.projectId = self.google_cloud_options.project
+    try:
+      response = self._client.projects_jobs.GetMetrics(request)
+    except exceptions.BadStatusCodeError as e:
+      logging.error('HTTP status %d. Unable to query metrics',
+                    e.response.status)
+      raise
+    return response
+
   def submit_job_description(self, job):
     """Creates and excutes a job request."""
     request = dataflow.DataflowProjectsJobsCreateRequest()


Mime
View raw message