beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [19/36] beam git commit: [BEAM-2783] support metrics in MapReduceRunner.
Date Thu, 07 Sep 2017 18:39:28 GMT
[BEAM-2783] support metrics in MapReduceRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c2390a1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c2390a1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c2390a1

Branch: refs/heads/mr-runner
Commit: 6c2390a1f7d7d912d186e84eed18f94e36d2a65f
Parents: b87ae78
Author: Pei He <pei@apache.org>
Authored: Wed Aug 30 17:11:07 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Thu Aug 31 14:13:49 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/MapReducePipelineResult.java      |  61 +++++++++++
 .../beam/runners/mapreduce/MapReduceRunner.java |   6 +-
 .../GroupAlsoByWindowsParDoOperation.java       |   3 +-
 .../mapreduce/translation/JobPrototype.java     |   8 +-
 .../translation/MapReduceMetricResults.java     | 106 +++++++++++++++++++
 .../mapreduce/translation/MetricsReporter.java  |  97 +++++++++++++++++
 .../translation/NormalParDoOperation.java       |   3 +-
 .../mapreduce/translation/ParDoOperation.java   |  33 +++++-
 .../mapreduce/translation/ParDoTranslator.java  |   1 +
 .../ReifyTimestampAndWindowsParDoOperation.java |   3 +-
 .../beam/runners/mapreduce/WordCountTest.java   |  16 ++-
 11 files changed, 325 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
new file mode 100644
index 0000000..90c521a
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.runners.mapreduce.translation.MapReduceMetricResults;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.hadoop.mapreduce.Job;
+import org.joda.time.Duration;
+
+public class MapReducePipelineResult implements PipelineResult {
+
+  private final List<Job> jobs;
+  public MapReducePipelineResult(List<Job> jobs) {
+    this.jobs = checkNotNull(jobs, "jobs");
+  }
+
+  @Override
+  public State getState() {
+    return State.DONE;
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    return State.DONE;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    return State.DONE;
+  }
+
+  @Override
+  public State waitUntilFinish() {
+    return State.DONE;
+  }
+
+  @Override
+  public MetricResults metrics() {
+    return new MapReduceMetricResults(jobs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 3f76808..88ed01e 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.mapreduce;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Throwables;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.runners.mapreduce.translation.DotfileWriter;
 import org.apache.beam.runners.mapreduce.translation.GraphConverter;
 import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
@@ -76,6 +78,7 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult>
{
 
     fusedGraph.getFusedSteps();
 
+    List<Job> jobs = new ArrayList<>();
     int stageId = 0;
     for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
       Configuration config = new Configuration();
@@ -87,11 +90,12 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult>
{
       try {
         Job job = jobPrototype.build(options.getJarClass(), config);
         job.waitForCompletion(true);
+        jobs.add(job);
       } catch (Exception e) {
         Throwables.throwIfUnchecked(e);
         throw new RuntimeException(e);
       }
     }
-    return null;
+    return new MapReducePipelineResult(jobs);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
index 768f17c..14e3a29 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
@@ -35,11 +35,12 @@ public class GroupAlsoByWindowsParDoOperation extends ParDoOperation {
   private final Coder<?> inputCoder;
 
   public GroupAlsoByWindowsParDoOperation(
+      String stepName,
       PipelineOptions options,
       WindowingStrategy<?, ?> windowingStrategy,
       Coder<?> inputCoder,
       Graphs.Tag outTag) {
-    super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
+    super(stepName, options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
         ImmutableList.<Graphs.Tag>of(), windowingStrategy);
     this.inputCoder = checkNotNull(inputCoder, "inputCoder");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 9f291d5..39487fd 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -117,15 +117,17 @@ public class JobPrototype {
       Graphs.Step reifyStep = Graphs.Step.of(
           reifyStepName,
           new ReifyTimestampAndWindowsParDoOperation(
-              options, operation.getWindowingStrategy(), reifyOutputTag));
+              reifyStepName, options, operation.getWindowingStrategy(), reifyOutputTag));
 
       Graphs.Step writeStep = Graphs.Step.of(
           groupByKey.getFullName() + "-Write",
           new ShuffleWriteOperation(kvCoder.getKeyCoder(), reifyValueCoder));
 
+      String gabwStepName = groupByKey.getFullName() + "-GroupAlsoByWindows";
       Graphs.Step gabwStep = Graphs.Step.of(
-          groupByKey.getFullName() + "-GroupAlsoByWindows",
-          new GroupAlsoByWindowsParDoOperation(options, windowingStrategy, kvCoder, gbkOutTag));
+          gabwStepName,
+          new GroupAlsoByWindowsParDoOperation(
+              gabwStepName, options, windowingStrategy, kvCoder, gbkOutTag));
 
       fusedStep.addStep(
           reifyStep, fusedStep.getInputTags(groupByKey), ImmutableList.of(reifyOutputTag));

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java
new file mode 100644
index 0000000..1d1c9ff
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MapReduceMetricResults.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
+import org.apache.beam.runners.core.construction.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Implementation of {@link MetricResults} for the MapReduce Runner.
+ */
+public class MapReduceMetricResults extends MetricResults {
+
+  private final List<Job> jobs;
+
+  public MapReduceMetricResults(List<Job> jobs) {
+    this.jobs = checkNotNull(jobs, "jobs");
+  }
+
+  @Override
+  public MetricQueryResults queryMetrics(MetricsFilter filter) {
+    List<MetricResult<Long>> counters = new ArrayList<>();
+    for (Job job : jobs) {
+      Iterable<CounterGroup> groups;
+      try {
+        groups = job.getCounters();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      for (CounterGroup group : groups) {
+        String groupName = group.getName();
+        for (Counter counter : group) {
+          MetricKey metricKey = MetricsReporter.toMetricKey(groupName, counter.getName());
+          if (!MetricFiltering.matches(filter, metricKey)) {
+            continue;
+          }
+          counters.add(
+              MapReduceMetricResult.create(
+                  metricKey.metricName(),
+                  metricKey.stepName(),
+                  counter.getValue()));
+        }
+      }
+    }
+    return MapReduceMetricQueryResults.create(counters);
+  }
+
+
+  @AutoValue
+  abstract static class MapReduceMetricQueryResults implements MetricQueryResults {
+
+    public abstract @Nullable Iterable<MetricResult<DistributionResult>> distributions();
+    public abstract @Nullable Iterable<MetricResult<GaugeResult>> gauges();
+
+    public static MetricQueryResults create(Iterable<MetricResult<Long>> counters)
{
+      return new AutoValue_MapReduceMetricResults_MapReduceMetricQueryResults(
+          counters, null, null);
+    }
+  }
+
+  @AutoValue
+  abstract static class MapReduceMetricResult<T> implements MetricResult<T> {
+    // need to define these here so they appear in the correct order
+    // and the generated constructor is usable and consistent
+    public abstract MetricName name();
+    public abstract String step();
+    public abstract @Nullable T committed();
+    public abstract T attempted();
+
+    public static <T> MetricResult<T> create(MetricName name, String step, T
attempted) {
+      return new AutoValue_MapReduceMetricResults_MapReduceMetricResult<T>(
+          name, step, null, attempted);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java
new file mode 100644
index 0000000..9fe139d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/MetricsReporter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.metrics.MetricKey;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to MapReduce framework.
+ */
+public class MetricsReporter {
+
+  private static final String METRIC_KEY_SEPARATOR = "__";
+  private static final String METRIC_PREFIX = "__metrics";
+
+  private final TaskAttemptContext context;
+  private final MetricsContainerStepMap metricsContainers;
+  private final Map<String, Long> reportedCounters = Maps.newHashMap();
+
+  MetricsReporter(TaskAttemptContext context) {
+    this.context = checkNotNull(context, "context");
+    this.metricsContainers = new MetricsContainerStepMap();
+  }
+
+  public MetricsContainer getMetricsContainer(String stepName) {
+    return metricsContainers.getContainer(stepName);
+  }
+
+  public void updateMetrics() {
+    MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
+    MetricQueryResults metricQueryResults =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+    updateCounters(metricQueryResults.counters());
+  }
+
+  private void updateCounters(Iterable<MetricResult<Long>> counters) {
+    for (MetricResult<Long> metricResult : counters) {
+      String reportedCounterKey = reportedCounterKey(metricResult);
+      Long updateValue = metricResult.attempted();
+      Long oldValue = reportedCounters.get(reportedCounterKey);
+
+      if (oldValue == null || oldValue < updateValue) {
+        Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
+        context.getCounter(groupName(metricResult), metricResult.name().name())
+            .increment(incValue);
+        reportedCounters.put(reportedCounterKey, updateValue);
+      }
+    }
+  }
+
+  private String groupName(MetricResult<?> metricResult) {
+    return METRIC_PREFIX
+        + METRIC_KEY_SEPARATOR + metricResult.step()
+        + METRIC_KEY_SEPARATOR + metricResult.name().namespace();
+  }
+
+  private String reportedCounterKey(MetricResult<?> metricResult) {
+    return metricResult.step()
+        + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+        + METRIC_KEY_SEPARATOR + metricResult.name().name();
+  }
+
+  public static MetricKey toMetricKey(String groupName, String counterName) {
+    String[] nameSplits = groupName.split(METRIC_KEY_SEPARATOR);
+    int length = nameSplits.length;
+    String stepName = length > 1 ? nameSplits[length - 2] : "";
+    String namespace = length > 0 ? nameSplits[length - 1] : "";
+    return MetricKey.create(stepName, MetricName.named(namespace, counterName));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
index 58a7d6d..8b730ff 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
@@ -33,13 +33,14 @@ public class NormalParDoOperation<InputT, OutputT> extends ParDoOperation<InputT
   private final DoFn<InputT, OutputT> doFn;
 
   public NormalParDoOperation(
+      String stepName,
       DoFn<InputT, OutputT> doFn,
       PipelineOptions options,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       List<Graphs.Tag> sideInputTags,
       WindowingStrategy<?, ?> windowingStrategy) {
-    super(options, mainOutputTag, sideOutputTags, sideInputTags, windowingStrategy);
+    super(stepName, options, mainOutputTag, sideOutputTags, sideInputTags, windowingStrategy);
     this.doFn = checkNotNull(doFn, "doFn");
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index 947d773..2c2fbde 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -28,6 +30,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -41,6 +44,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  * Operation for ParDo.
  */
 public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
{
+  private final String stepName;
   protected final SerializedPipelineOptions options;
   protected final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
@@ -48,17 +52,19 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
   private final List<Graphs.Tag> sideInputTags;
   private Map<TupleTag<?>, String> tupleTagToFilePath;
 
-
+  private MetricsReporter metricsReporter;
   protected DoFnInvoker<InputT, OutputT> doFnInvoker;
   private DoFnRunner<InputT, OutputT> fnRunner;
 
   public ParDoOperation(
+      String stepName,
       PipelineOptions options,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       List<Graphs.Tag> sideInputTags,
       WindowingStrategy<?, ?> windowingStrategy) {
     super(1 + sideOutputTags.size());
+    this.stepName = checkNotNull(stepName, "stepName");
     this.options = new SerializedPipelineOptions(checkNotNull(options, "options"));
     this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
     this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
@@ -74,6 +80,8 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
   @Override
   public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext)
{
     super.start(taskContext);
+    this.metricsReporter = new MetricsReporter(taskContext);
+
     DoFn<InputT, OutputT> doFn = getDoFn();
     // Process user's setup
     doFnInvoker = DoFnInvokers.invokerFor(doFn);
@@ -94,7 +102,13 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
         sideOutputTags,
         null,
         windowingStrategy);
-    fnRunner.startBundle();
+
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      fnRunner.startBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -102,12 +116,23 @@ public abstract class ParDoOperation<InputT, OutputT> extends
Operation<InputT>
    */
   @Override
   public void process(WindowedValue<InputT> elem) {
-    fnRunner.processElement(elem);
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      fnRunner.processElement(elem);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public void finish() {
-    fnRunner.finishBundle();
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      fnRunner.finishBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    metricsReporter.updateMetrics();
     doFnInvoker.invokeTeardown();
     super.finish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
index ae23f71..e866fe2 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
@@ -31,6 +31,7 @@ class ParDoTranslator<InputT, OutputT>
       ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
     TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
     NormalParDoOperation operation = new NormalParDoOperation(
+        transform.getName(),
         transform.getFn(),
         userGraphContext.getOptions(),
         transform.getMainOutputTag(),

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
index 9a63b05..9d6b895 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -32,10 +32,11 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation {
 
   public ReifyTimestampAndWindowsParDoOperation(
+      String stepName,
       PipelineOptions options,
       WindowingStrategy<?, ?> windowingStrategy,
       Graphs.Tag outTag) {
-    super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
+    super(stepName, options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
         ImmutableList.<Graphs.Tag>of(), windowingStrategy);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
index 363ba01..263905c 100644
--- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
@@ -18,9 +18,13 @@
 package org.apache.beam.runners.mapreduce;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -51,11 +55,14 @@ public class WordCountTest {
    */
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
+    private final Counter nonEmptyLines = Metrics.counter(ExtractWordsFn.class, "nonEmptyLines");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.inc();
+      } else {
+        nonEmptyLines.inc();
       }
 
       // Split the line into words.
@@ -98,6 +105,13 @@ public class WordCountTest {
         .apply(MapElements.via(new FormatAsTextFn()))
         .apply("WriteCounts", TextIO.write().to(output));
 
-    p.run();
+    PipelineResult result = p.run();
+    Iterable<MetricResult<Long>> counters = result.metrics()
+        .queryMetrics(
+            MetricsFilter.builder()
+                .addNameFilter(MetricNameFilter.named(ExtractWordsFn.class, "emptyLines"))
+                .build())
+        .counters();
+    System.out.println(counters.iterator().next());
   }
 }


Mime
View raw message