beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-2096] Make DataflowMetrics more resilient
Date Thu, 27 Apr 2017 20:10:40 GMT
Repository: beam
Updated Branches:
  refs/heads/master 884935cb9 -> fdf2de999


[BEAM-2096] Make DataflowMetrics more resilient

DataflowMetrics seems to have many hard-coded assumptions about what will be
returned by the Dataflow service that, which will likely break when users
use new types of metrics or if the Dataflow service makes minor adjustments
in our it sends metrics back to the user.

In order for code to continue working in these cases, handle errors by logging
and skipping.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e047b69e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e047b69e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e047b69e

Branch: refs/heads/master
Commit: e047b69efab9c988011303cf2eda86ac408b38c2
Parents: 884935c
Author: Dan Halperin <dhalperi@google.com>
Authored: Thu Apr 27 08:30:47 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Apr 27 13:10:22 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowMetrics.java  | 38 ++++++++++++--------
 1 file changed, 24 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e047b69e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 7633a56..aa80959 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -136,22 +136,32 @@ class DataflowMetrics extends MetricResults {
         ImmutableList.builder();
     ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
     for (MetricKey metricKey : metricHashKeys) {
-      String metricName = metricKey.metricName().name();
-      if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
-          || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
-        // Skip distribution metrics, as these are not yet properly supported.
-        LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow"
-            + "User Interface");
+      if (!MetricFiltering.matches(filter, metricKey)) {
+        // Skip unmatched metrics early.
         continue;
       }
-      String namespace = metricKey.metricName().namespace();
-      String step = metricKey.stepName();
-      Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue();
-      Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue();
-      if (MetricFiltering.matches(filter, metricKey)) {
-        counterResults.add(DataflowMetricResult.create(
-            MetricName.named(namespace, metricName),
-            step, committed, attempted));
+
+      // This code is not robust to evolutions in the types of metrics that can be returned,
so
+      // wrap it in a try-catch and log errors.
+      try {
+        String metricName = metricKey.metricName().name();
+        if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
+            || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
+          // Skip distribution metrics, as these are not yet properly supported.
+          LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow"
+              + "User Interface");
+          continue;
+        }
+
+        String namespace = metricKey.metricName().namespace();
+        String step = metricKey.stepName();
+        Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue();
+        Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue();
+        counterResults.add(
+            DataflowMetricResult.create(
+                MetricName.named(namespace, metricName), step, committed, attempted));
+      } catch (Exception e) {
+        LOG.warn("Error handling metric {} for filter {}, skipping result.", metricKey, filter);
       }
     }
     return DataflowMetricQueryResults.create(


Mime
View raw message