beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Adding metrics() to DataflowPipelineResult.
Date Thu, 02 Feb 2017 23:22:27 GMT
Repository: beam
Updated Branches:
  refs/heads/master 1a6f2e8f6 -> c01ed083e


Adding metrics() to DataflowPipelineResult.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac4185ed
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac4185ed
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac4185ed

Branch: refs/heads/master
Commit: ac4185ed3952397590947d79832fa0b1507cce19
Parents: 1a6f2e8
Author: Pablo <pabloem@google.com>
Authored: Thu Feb 2 13:42:58 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Thu Feb 2 15:08:22 2017 -0800

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_metrics.py        | 33 ++++++++++++++++++++
 .../runners/dataflow/dataflow_metrics_test.py   | 20 ++++++++++++
 .../apache_beam/runners/dataflow_runner.py      |  4 +++
 .../apache_beam/runners/dataflow_runner_test.py |  5 +++
 sdks/python/apache_beam/runners/runner.py       |  8 +++++
 5 files changed, 70 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
new file mode 100644
index 0000000..1d86f2f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+DataflowRunner implementation of MetricResults. It is in charge of
+responding to queries of current metrics by going to the dataflow
+service.
+"""
+
+from apache_beam.metrics.metric import MetricResults
+
+
+# TODO(pabloem)(JIRA-1381) Implement this once metrics are queriable from
+# dataflow service
+class DataflowMetrics(MetricResults):
+
+  def query(self, filter=None):
+    return {'counters': [],
+            'distributions': []}

http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
new file mode 100644
index 0000000..5475ac7
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Tests corresponding to the DataflowRunner implementation of MetricsResult,
+the DataflowMetrics class.
+"""

http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index abcc764..f02e24b 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -32,6 +32,7 @@ from apache_beam import pvalue
 from apache_beam.internal import json_value
 from apache_beam.internal import pickler
 from apache_beam.pvalue import PCollectionView
+from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
@@ -648,6 +649,9 @@ class DataflowPipelineResult(PipelineResult):
   def job_id(self):
     return self._job.id
 
+  def metrics(self):
+    return DataflowMetrics()
+
   @property
   def has_job(self):
     return self._job is not None

http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py
index 4983899..946e6a8 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py
@@ -27,6 +27,11 @@ from apache_beam.runners.dataflow_runner import DataflowPipelineResult
 
 class DataflowRunnerTest(unittest.TestCase):
 
+  def test_dataflow_runner_has_metrics(self):
+    df_result = DataflowPipelineResult('somejob', 'somerunner')
+    self.assertTrue(df_result.metrics())
+    self.assertTrue(df_result.metrics().query())
+
   @mock.patch('time.sleep', return_value=None)
   def test_wait_until_finish(self, patched_time_sleep):
     values_enum = dataflow_api.Job.CurrentStateValueValuesEnum

http://git-wip-us.apache.org/repos/asf/beam/blob/ac4185ed/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 1a50df4..e14acb1 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -343,6 +343,14 @@ class PipelineResult(object):
     """
     raise NotImplementedError
 
+  def metrics(self):
+    """Returns MetricsResult object to query metrics from the runner.
+
+    Raises:
+      NotImplementedError: If the runner does not support this operation.
+    """
+    raise NotImplementedError
+
   # pylint: disable=unused-argument
   def aggregated_values(self, aggregator_or_name):
     """Return a dict of step names to values of the Aggregator."""


Mime
View raw message