beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [4/5] beam git commit: [BEAM-2244] Move details of Metrics to Runners Core
Date Thu, 11 May 2017 00:01:49 GMT
[BEAM-2244] Move details of Metrics to Runners Core

Largeish changes this required were:

  - splitting the MetricsContainer into an interface in Java Core with
    an implementation in Runners Core
  - modifying the various *Cell classes to have a name
  - cleaning up dependency cross-fire.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ce5591c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ce5591c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ce5591c

Branch: refs/heads/release-2.0.0
Commit: 0ce5591c85be9d275082095041c55c26143109e7
Parents: e08cac0
Author: bchambers <bchambers@google.com>
Authored: Tue May 9 15:45:50 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed May 10 17:01:32 2017 -0700

----------------------------------------------------------------------
 .../runners/core/metrics/MetricFiltering.java   | 102 ++++
 .../beam/runners/core/metrics/MetricKey.java    |  43 ++
 .../beam/runners/core/metrics/package-info.java |  22 +
 .../core/metrics/MetricFilteringTest.java       | 148 ++++++
 .../apache/beam/runners/core/LateDataUtils.java |   4 +-
 .../beam/runners/core/metrics/CounterCell.java  |  86 ++++
 .../beam/runners/core/metrics/DirtyState.java   |  99 ++++
 .../runners/core/metrics/DistributionCell.java  |  80 +++
 .../runners/core/metrics/DistributionData.java  |  62 +++
 .../beam/runners/core/metrics/GaugeCell.java    |  78 +++
 .../beam/runners/core/metrics/GaugeData.java    |  83 ++++
 .../beam/runners/core/metrics/MetricCell.java   |  42 ++
 .../runners/core/metrics/MetricUpdates.java     |  79 +++
 .../core/metrics/MetricsContainerImpl.java      | 188 +++++++
 .../core/metrics/MetricsContainerStepMap.java   | 495 +++++++++++++++++++
 .../beam/runners/core/metrics/MetricsMap.java   |  88 ++++
 .../beam/runners/core/metrics/package-info.java |  22 +
 .../core/LateDataDroppingDoFnRunnerTest.java    |   9 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   |  34 +-
 .../runners/core/StatefulDoFnRunnerTest.java    |  10 +-
 .../runners/core/metrics/CounterCellTest.java   |  56 +++
 .../runners/core/metrics/DirtyStateTest.java    |  56 +++
 .../core/metrics/DistributionCellTest.java      |  54 ++
 .../runners/core/metrics/GaugeCellTest.java     |  51 ++
 .../core/metrics/MetricUpdateMatchers.java      |  82 +++
 .../core/metrics/MetricsContainerImplTest.java  | 130 +++++
 .../metrics/MetricsContainerStepMapTest.java    | 272 ++++++++++
 .../runners/core/metrics/MetricsMapTest.java    | 103 ++++
 .../beam/runners/direct/DirectMetrics.java      |  14 +-
 .../runners/direct/StepTransformResult.java     |   2 +-
 .../beam/runners/direct/TransformExecutor.java  |  10 +-
 .../beam/runners/direct/TransformResult.java    |   2 +-
 .../beam/runners/direct/DirectMetricsTest.java  |  14 +-
 .../beam/runners/flink/FlinkRunnerResult.java   |   4 +-
 .../metrics/DoFnRunnerWithMetricsUpdate.java    |   3 +-
 .../flink/metrics/FlinkMetricContainer.java     |   7 +-
 .../flink/metrics/MetricsAccumulator.java       |   2 +-
 .../flink/metrics/ReaderInvocationUtil.java     |   5 +-
 .../beam/runners/dataflow/DataflowMetrics.java  |   4 +-
 .../runners/dataflow/DataflowMetricsTest.java   |   4 +-
 .../beam/runners/spark/SparkPipelineResult.java |   2 +-
 .../apache/beam/runners/spark/io/SourceRDD.java |   2 +-
 .../runners/spark/io/SparkUnboundedSource.java  |   2 +-
 .../spark/metrics/MetricsAccumulator.java       |   2 +-
 .../spark/metrics/MetricsAccumulatorParam.java  |   2 +-
 .../runners/spark/metrics/SparkBeamMetric.java  |   4 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |  10 +-
 .../spark/stateful/StateSpecFunctions.java      |   2 +-
 .../translation/DoFnRunnerWithMetrics.java      |   5 +-
 .../spark/translation/MultiDoFnFunction.java    |   2 +-
 .../spark/translation/TransformTranslator.java  |   2 +-
 .../streaming/StreamingTransformTranslator.java |   2 +-
 .../ResumeFromCheckpointStreamingTest.java      |   2 +-
 .../streaming/StreamingSourceMetricsTest.java   |   2 +-
 .../apache/beam/sdk/metrics/CounterCell.java    |  72 ---
 .../org/apache/beam/sdk/metrics/DirtyState.java |  99 ----
 .../beam/sdk/metrics/DistributionCell.java      |  74 ---
 .../beam/sdk/metrics/DistributionData.java      |  60 ---
 .../beam/sdk/metrics/DistributionResult.java    |   3 +
 .../org/apache/beam/sdk/metrics/GaugeCell.java  |  70 ---
 .../org/apache/beam/sdk/metrics/GaugeData.java  |  81 ---
 .../apache/beam/sdk/metrics/GaugeResult.java    |   3 +
 .../org/apache/beam/sdk/metrics/Metric.java     |   7 +-
 .../org/apache/beam/sdk/metrics/MetricCell.java |  53 --
 .../beam/sdk/metrics/MetricFiltering.java       |  99 ----
 .../org/apache/beam/sdk/metrics/MetricKey.java  |  41 --
 .../apache/beam/sdk/metrics/MetricUpdates.java  |  78 ---
 .../org/apache/beam/sdk/metrics/Metrics.java    |   8 +-
 .../beam/sdk/metrics/MetricsContainer.java      | 146 +-----
 .../sdk/metrics/MetricsContainerStepMap.java    | 487 ------------------
 .../beam/sdk/metrics/MetricsEnvironment.java    |   9 +-
 .../org/apache/beam/sdk/metrics/MetricsMap.java |  87 ----
 .../apache/beam/sdk/metrics/SinkMetrics.java    |   4 +
 .../apache/beam/sdk/metrics/SourceMetrics.java  |   3 +
 .../beam/sdk/metrics/CounterCellTest.java       |  55 ---
 .../apache/beam/sdk/metrics/DirtyStateTest.java |  56 ---
 .../beam/sdk/metrics/DistributionCellTest.java  |  53 --
 .../apache/beam/sdk/metrics/GaugeCellTest.java  |  49 --
 .../beam/sdk/metrics/MetricFilteringTest.java   | 145 ------
 .../apache/beam/sdk/metrics/MetricMatchers.java | 242 ---------
 .../beam/sdk/metrics/MetricResultsMatchers.java | 190 +++++++
 .../metrics/MetricsContainerStepMapTest.java    | 258 ----------
 .../beam/sdk/metrics/MetricsContainerTest.java  | 129 -----
 .../sdk/metrics/MetricsEnvironmentTest.java     |  23 +-
 .../apache/beam/sdk/metrics/MetricsMapTest.java | 103 ----
 .../apache/beam/sdk/metrics/MetricsTest.java    |  43 +-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |  54 +-
 87 files changed, 2899 insertions(+), 2675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java
new file mode 100644
index 0000000..d469d20
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.common.base.Objects;
+import java.util.Set;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Implements matching for metrics filters. Specifically, matching for metric name,
+ * namespace, and step name.
+ */
+public class MetricFiltering {
+
+  private MetricFiltering() { }
+
+  /** Matching logic is implemented here rather than in MetricsFilter because we would like
+   *  MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with
+   *  a Proto/JSON/etc. schema object.
+   * @param filter {@link MetricsFilter} with the matching information of an actual metric
+   * @param key {@link MetricKey} with the information of a metric
+   * @return whether the filter matches the key or not
+   */
+  public static boolean matches(MetricsFilter filter, MetricKey key) {
+    return filter == null
+        || (matchesName(key.metricName(), filter.names())
+        && matchesScope(key.stepName(), filter.steps()));
+  }
+
+  /**
+   * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
+   * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b",
+   * but not "a/fool/bar/b" or "a/foo/bart/b".
+   */
+  public static boolean subPathMatches(String haystack, String needle) {
+    int location = haystack.indexOf(needle);
+    int end = location + needle.length();
+    if (location == -1) {
+      return false;  // needle not found
+    } else if (location != 0 && haystack.charAt(location - 1) != '/') {
+      return false; // the first entry in needle wasn't exactly matched
+    } else if (end != haystack.length() && haystack.charAt(end) != '/') {
+      return false; // the last entry in needle wasn't exactly matched
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched
+   * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A
+   * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or
+   * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */
+  public static boolean matchesScope(String actualScope, Set<String> scopes) {
+    if (scopes.isEmpty() || scopes.contains(actualScope)) {
+      return true;
+    }
+
+    // If there is no perfect match, a stage name-level match is tried.
+    // This is done by a substring search over the levels of the scope.
+    // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
+    for (String scope : scopes) {
+      if (subPathMatches(actualScope, scope)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private static boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) {
+    if (nameFilters.isEmpty()) {
+      return true;
+    }
+    for (MetricNameFilter nameFilter : nameFilters) {
+      if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name()))
+          && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java
new file mode 100644
index 0000000..58d4055
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.MetricName;
+
+/**
+ * Metrics are keyed by the step name they are associated with and the name of the metric.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricKey implements Serializable {
+
+  /** The step name that is associated with this metric. */
+  public abstract String stepName();
+
+  /** The name of the metric. */
+  public abstract MetricName metricName();
+
+  public static MetricKey create(String stepName, MetricName metricName) {
+    return new AutoValue_MetricKey(stepName, metricName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
new file mode 100644
index 0000000..263a705
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities for runners to implement metrics.
+ */
+package org.apache.beam.runners.core.metrics;

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java
new file mode 100644
index 0000000..8953f21
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricFiltering}.
+ */
+@RunWith(JUnit4.class)
+public class MetricFilteringTest {
+  private static final MetricName NAME1 = MetricName.named("ns1", "name1");
+
+
+  private boolean matchesSubPath(String actualScope, String subPath) {
+    return MetricFiltering.subPathMatches(actualScope, subPath);
+  }
+
+  @Test
+  public void testMatchCompositeStepNameFilters() {
+    // MetricsFilter with a Class-namespace + name filter + step filter.
+    // Successful match.
+    assertTrue(MetricFiltering.matches(
+        MetricsFilter.builder().addNameFilter(
+            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+            .addStep("myStep").build(),
+        MetricKey.create(
+            "myBigStep/myStep", MetricName.named(MetricFilteringTest.class, "myMetricName"))));
+
+    // Unsuccessful match.
+    assertFalse(MetricFiltering.matches(
+        MetricsFilter.builder().addNameFilter(
+            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+            .addStep("myOtherStep").build(),
+        MetricKey.create(
+            "myOtherStepNoMatch/myStep",
+            MetricName.named(MetricFilteringTest.class, "myMetricName"))));
+  }
+
+  @Test
+  public void testMatchStepNameFilters() {
+    // MetricsFilter with a Class-namespace + name filter + step filter.
+    // Successful match.
+    assertTrue(MetricFiltering.matches(
+        MetricsFilter.builder().addNameFilter(
+            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+        .addStep("myStep").build(),
+        MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, "myMetricName"))));
+
+    // Unsuccessful match.
+    assertFalse(MetricFiltering.matches(
+        MetricsFilter.builder().addNameFilter(
+            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+        .addStep("myOtherStep").build(),
+        MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, "myMetricName"))));
+  }
+
+  @Test
+  public void testMatchClassNamespaceFilters() {
+    // MetricsFilter with a Class-namespace + name filter. Without step filter.
+    // Successful match.
+    assertTrue(MetricFiltering.matches(
+        MetricsFilter.builder().addNameFilter(
+            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")).build(),
+        MetricKey.create("anyStep", MetricName.named(MetricFilteringTest.class, "myMetricName"))));
+
+    // Unsuccessful match.
+    assertFalse(MetricFiltering.matches(
+        MetricsFilter.builder().addNameFilter(
+            MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")).build(),
+        MetricKey.create("anyStep", MetricName.named(MetricFiltering.class, "myMetricName"))));
+  }
+
+  @Test
+  public void testMatchStringNamespaceFilters() {
+    // MetricsFilter with a String-namespace + name filter. Without step filter.
+    // Successful match.
+    assertTrue(
+        MetricFiltering.matches(
+            MetricsFilter.builder().addNameFilter(
+                MetricNameFilter.named("myNamespace", "myMetricName")).build(),
+            MetricKey.create("anyStep", MetricName.named("myNamespace", "myMetricName"))));
+
+    // Unsuccessful match.
+    assertFalse(
+        MetricFiltering.matches(
+            MetricsFilter.builder().addNameFilter(
+                MetricNameFilter.named("myOtherNamespace", "myMetricName")).build(),
+            MetricKey.create("anyStep", MetricName.named("myNamespace", "myMetricname"))));
+  }
+
+  @Test
+  public void testMatchesSubPath() {
+    assertTrue("Match of the first element",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
+    assertTrue("Match of the first elements",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+    assertTrue("Match of the last elements",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
+    assertFalse("Substring match but no subpath match",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
+    assertFalse("Substring match from start - but no subpath match",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
+  }
+
+  private boolean matchesScopeWithSingleFilter(String actualScope, String filter) {
+    Set<String> scopeFilter = new HashSet<String>();
+    scopeFilter.add(filter);
+    return MetricFiltering.matchesScope(actualScope, scopeFilter);
+  }
+
+  @Test
+  public void testMatchesScope() {
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1"));
+    assertTrue(matchesScopeWithSingleFilter(
+        "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1"));
+    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1"));
+    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index f7c0d31..732e60c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -22,7 +22,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.metrics.CounterCell;
+import org.apache.beam.runners.core.metrics.CounterCell;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -71,7 +71,7 @@ public class LateDataUtils {
                         .isBefore(timerInternals.currentInputWatermarkTime());
                 if (expired) {
                   // The element is too late for this window.
-                  droppedDueToLateness.update(1L);
+                  droppedDueToLateness.inc();
                   WindowTracing.debug(
                       "GroupAlsoByWindow: Dropping element at {} for key: {}; "
                           + "window: {} since it is too far behind inputWatermark: {}",

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
new file mode 100644
index 0000000..4378bb9
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricName;
+
+/**
+ * Tracks the current value (and delta) for a Counter metric for a specific context and bundle.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is within a runner where
+ * a counter is being reported for a specific step (rather than the counter in the current context).
+ * In that case retrieving the underlying cell and reporting directly to it avoids a step of
+ * indirection.
+ */
+@Experimental(Kind.METRICS)
+public class CounterCell implements Counter, MetricCell<Long> {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicLong value = new AtomicLong();
+  private final MetricName name;
+
+  /**
+   * Package-visibility because all {@link CounterCell CounterCells} should be created by
+   * {@link MetricsContainerImpl#getCounter(MetricName)}.
+   */
+  CounterCell(MetricName name) {
+    this.name = name;
+  }
+
+  /**
+   * Increment the counter by the given amount.
+   * @param n value to increment by. Can be negative to decrement.
+   */
+  @Override
+  public void inc(long n) {
+    value.addAndGet(n);
+    dirty.afterModification();
+  }
+
+  public void inc() {
+    inc(1);
+  }
+
+  public void dec() {
+    inc(-1);
+  }
+
+  public void dec(long n) {
+    inc(-1 * n);
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public Long getCumulative() {
+    return value.get();
+  }
+
+  @Override
+  public MetricName getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
new file mode 100644
index 0000000..532fc2a
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Atomically tracks the dirty-state of a metric.
+ *
+ * <p>Reporting an update is split into two parts such that only changes made before the call to
+ * {@link #beforeCommit()} are committed when {@link #afterCommit()} is invoked. This allows for
+ * a two-step commit process of gathering all the dirty updates (calling {#link beforeCommit()})
+ * followed by committing and calling {#link afterCommit()}.
+ *
+ * <p>The tracking of dirty states is done conservatively -- sometimes {@link #beforeCommit()}
+ * will return true (indicating a dirty metric) even if there have been no changes since the last
+ * commit.
+ *
+ * <p>There is also a possible race when the underlying metric is modified but the call to
+ * {@link #afterModification()} hasn't happened before the call to {@link #beforeCommit()}. In this
+ * case the next round of metric updating will see the changes. If this was for the final commit,
+ * then the metric updates shouldn't be extracted until all possible user modifications have
+ * completed.
+ */
+@Experimental(Kind.METRICS)
+class DirtyState implements Serializable {
+  private enum State {
+    /** Indicates that there have been changes to the MetricCell since last commit. */
+    DIRTY,
+    /** Indicates that there have been no changes to the MetricCell since last commit. */
+    CLEAN,
+    /** Indicates that a commit of the current value is in progress. */
+    COMMITTING
+  }
+
+  private final AtomicReference<State> dirty = new AtomicReference<>(State.DIRTY);
+
+  /**
+   * Indicate that changes have been made to the metric being tracked by this {@link DirtyState}.
+   *
+   * <p>Should be called <b>after</b> modification of the value.
+   */
+  public void afterModification() {
+    dirty.set(State.DIRTY);
+  }
+
+  /**
+   * Check the dirty state and mark the metric as committing.
+   *
+   * <p>If the state was {@code CLEAN}, this returns {@code false}. If the state was {@code DIRTY}
+   * or {@code COMMITTING} this returns {@code true} and sets the state to {@code COMMITTING}.
+   *
+   * @return {@code false} if the state is clean and {@code true} otherwise.
+   */
+  public boolean beforeCommit() {
+    // After this loop, we want the state to be either CLEAN or COMMITTING.
+    // If the state was CLEAN, we don't need to do anything (and exit the loop early)
+    // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING). This will only
+    // fail if another thread is getting updates which generally shouldn't be the case.
+    // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING, COMMITTING). This will
+    // fail if another thread commits updates (which shouldn't be the case) or if the user code
+    // updates the metric, in which case it will transition to DIRTY and the next iteration will
+    // successfully update it.
+    State state;
+    do {
+      state = dirty.get();
+    } while (state != State.CLEAN && !dirty.compareAndSet(state, State.COMMITTING));
+
+    return state != State.CLEAN;
+  }
+
+  /**
+   * Mark any changes up to the most recently call to {@link #beforeCommit()}} as committed.
+   * The next call to {@link #beforeCommit()} will return {@code false} unless there have
+   * been changes made since the previous call to {@link #beforeCommit()}.
+   */
+  public void afterCommit() {
+    dirty.compareAndSet(State.COMMITTING, State.CLEAN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
new file mode 100644
index 0000000..5a5099a
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.MetricName;
+
+/**
+ * Tracks the current value (and delta) for a Distribution metric.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is within a runner where
+ * a distribution is being reported for a specific step (rather than the distribution in the current
+ * context). In that case retrieving the underlying cell and reporting directly to it avoids a step
+ * of indirection.
+ */
+@Experimental(Kind.METRICS)
+public class DistributionCell implements Distribution, MetricCell<DistributionData> {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicReference<DistributionData> value =
+      new AtomicReference<>(DistributionData.EMPTY);
+  private final MetricName name;
+
+  /**
+   * Package-visibility because all {@link DistributionCell DistributionCells} should be created by
+   * {@link MetricsContainerImpl#getDistribution(MetricName)}.
+   */
+  DistributionCell(MetricName name) {
+    this.name = name;
+  }
+
+  /** Increment the distribution by the given amount. */
+  @Override
+  public void update(long n) {
+    update(DistributionData.singleton(n));
+  }
+
+  void update(DistributionData data) {
+    DistributionData original;
+    do {
+      original = value.get();
+    } while (!value.compareAndSet(original, original.combine(data)));
+    dirty.afterModification();
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public DistributionData getCumulative() {
+    return value.get();
+  }
+
+  @Override
+  public MetricName getName() {
+    return name;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java
new file mode 100644
index 0000000..099e63f
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.metrics.DistributionResult;
+
+/**
+ * Data describing the the distribution. This should retain enough detail that it can be combined
+ * with other {@link DistributionData}.
+ *
+ * <p>This is kept distinct from {@link DistributionResult} since this may be extended to include
+ * data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include
+ * the approximate value of those quantiles.
+ */
+@AutoValue
+public abstract class DistributionData implements Serializable {
+
+  public abstract long sum();
+  public abstract long count();
+  public abstract long min();
+  public abstract long max();
+
+  public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);
+
+  public static DistributionData create(long sum, long count, long min, long max) {
+    return new AutoValue_DistributionData(sum, count, min, max);
+  }
+
+  public static DistributionData singleton(long value) {
+    return create(value, 1, value, value);
+  }
+
+  public DistributionData combine(DistributionData value) {
+    return create(
+        sum() + value.sum(),
+        count() + value.count(),
+        Math.min(value.min(), min()),
+        Math.max(value.max(), max()));
+  }
+
+  public DistributionResult extractResult() {
+    return DistributionResult.create(sum(), count(), min(), max());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java
new file mode 100644
index 0000000..795e826
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+
+/**
+ * Tracks the current value (and delta) for a {@link Gauge} metric.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is within a runner where
+ * a gauge is being reported for a specific step (rather than the gauge in the current
+ * context). In that case retrieving the underlying cell and reporting directly to it avoids a step
+ * of indirection.
+ */
+@Experimental(Experimental.Kind.METRICS)
+public class GaugeCell implements Gauge, MetricCell<GaugeData> {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty());
+  private final MetricName name;
+
+  /**
+   * Package-visibility because all {@link GaugeCell GaugeCells} should be created by
+   * {@link MetricsContainerImpl#getGauge(MetricName)}.
+   */
+  GaugeCell(MetricName name) {
+    this.name = name;
+  }
+
+  /** Set the gauge to the given value. */
+  @Override
+  public void set(long value) {
+    update(GaugeData.create(value));
+  }
+
+  void update(GaugeData data) {
+    GaugeData original;
+    do {
+      original = gaugeValue.get();
+    } while (!gaugeValue.compareAndSet(original, original.combine(data)));
+    dirty.afterModification();
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public GaugeData getCumulative() {
+    return gaugeValue.get();
+  }
+
+
+  @Override
+  public MetricName getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeData.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeData.java
new file mode 100644
index 0000000..15a353f
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeData.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.joda.time.Instant;
+
+/**
+ * Data describing the gauge. This should retain enough detail that it can be combined with
+ * other {@link GaugeData}.
+ */
+@AutoValue
+public abstract class GaugeData implements Serializable {
+
+  public abstract long value();
+
+  public abstract Instant timestamp();
+
+  public static GaugeData create(long value) {
+    return new AutoValue_GaugeData(value, Instant.now());
+  }
+
+  public static GaugeData empty() {
+    return EmptyGaugeData.INSTANCE;
+  }
+
+  public GaugeData combine(GaugeData other) {
+    if (this.timestamp().isAfter(other.timestamp())) {
+      return this;
+    } else {
+      return other;
+    }
+  }
+
+  public GaugeResult extractResult() {
+    return GaugeResult.create(value(), timestamp());
+  }
+
+  /**
+   * Empty {@link GaugeData}, representing no values reported.
+   */
+  public static class EmptyGaugeData extends GaugeData {
+
+    private static final EmptyGaugeData INSTANCE = new EmptyGaugeData();
+    private static final Instant EPOCH = new Instant(0);
+
+    private EmptyGaugeData() {
+    }
+
+    @Override
+    public long value() {
+      return -1L;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return EPOCH;
+    }
+
+    @Override
+    public GaugeResult extractResult() {
+      return GaugeResult.empty();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
new file mode 100644
index 0000000..2b773f0
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a
+ * specific metric name in a single context.
+ *
+ * @param <DataT> The type of metric data stored (and extracted) from this cell.
+ */
+@Experimental(Kind.METRICS)
+public interface MetricCell<DataT> extends Serializable {
+  /**
+   * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes.
+   */
+  DirtyState getDirty();
+
+  /**
+   * Return the cumulative value of this metric.
+   */
+  DataT getCumulative();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
new file mode 100644
index 0000000..eae3305
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Representation of multiple metric updates.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricUpdates {
+
+  public static final MetricUpdates EMPTY = MetricUpdates.create(
+      Collections.<MetricUpdate<Long>>emptyList(),
+      Collections.<MetricUpdate<DistributionData>>emptyList(),
+      Collections.<MetricUpdate<GaugeData>>emptyList());
+
+  /**
+   * Representation of a single metric update.
+   * @param <T> The type of value representing the update.
+   */
+  @AutoValue
+  public abstract static class MetricUpdate<T> implements Serializable {
+
+    /** The key being updated. */
+    public abstract MetricKey getKey();
+    /** The value of the update. */
+    public abstract T getUpdate();
+
+    public static <T> MetricUpdate<T> create(MetricKey key, T update) {
+      return new AutoValue_MetricUpdates_MetricUpdate(key, update);
+    }
+  }
+
+  /** Returns true if there are no updates in this MetricUpdates object. */
+  public boolean isEmpty() {
+    return Iterables.isEmpty(counterUpdates())
+        && Iterables.isEmpty(distributionUpdates());
+  }
+
+  /** All of the counter updates. */
+  public abstract Iterable<MetricUpdate<Long>> counterUpdates();
+
+  /** All of the distribution updates. */
+  public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates();
+
+  /** All of the gauges updates. */
+  public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates();
+
+  /** Create a new {@link MetricUpdates} bundle. */
+  public static MetricUpdates create(
+      Iterable<MetricUpdate<Long>> counterUpdates,
+      Iterable<MetricUpdate<DistributionData>> distributionUpdates,
+      Iterable<MetricUpdate<GaugeData>> gaugeUpdates) {
+    return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates, gaugeUpdates);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
new file mode 100644
index 0000000..6967bf0
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+
+/**
+ * Holds the metrics for a single step and unit-of-commit (bundle).
+ *
+ * <p>This class is thread-safe. It is intended to be used with 1 (or more) threads are updating
+ * metrics and at-most 1 thread is extracting updates by calling {@link #getUpdates} and
+ * {@link #commitUpdates}. Outside of this it is still safe. Although races in the update extraction
+ * may cause updates that don't actually have any changes, it will never lose an update.
+ *
+ * <p>For consistency, all threads that update metrics should finish before getting the final
+ * cumulative values/updates.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsContainerImpl implements Serializable, MetricsContainer {
+
+  private final String stepName;
+
+  private MetricsMap<MetricName, CounterCell> counters =
+      new MetricsMap<>(new MetricsMap.Factory<MetricName, CounterCell>() {
+        @Override
+        public CounterCell createInstance(MetricName name) {
+          return new CounterCell(name);
+        }
+      });
+
+  private MetricsMap<MetricName, DistributionCell> distributions =
+      new MetricsMap<>(new MetricsMap.Factory<MetricName, DistributionCell>() {
+        @Override
+        public DistributionCell createInstance(MetricName name) {
+          return new DistributionCell(name);
+        }
+      });
+
+  private MetricsMap<MetricName, GaugeCell> gauges =
+      new MetricsMap<>(new MetricsMap.Factory<MetricName, GaugeCell>() {
+        @Override
+        public GaugeCell createInstance(MetricName name) {
+          return new GaugeCell(name);
+        }
+      });
+
+  /**
+   * Create a new {@link MetricsContainerImpl} associated with the given {@code stepName}.
+   */
+  public MetricsContainerImpl(String stepName) {
+    this.stepName = stepName;
+  }
+
+  @Override
+  public CounterCell getCounter(MetricName metricName) {
+    return counters.get(metricName);
+  }
+
+  @Override
+  public DistributionCell getDistribution(MetricName metricName) {
+    return distributions.get(metricName);
+  }
+
+  @Override
+  public GaugeCell getGauge(MetricName metricName) {
+    return gauges.get(metricName);
+  }
+
+  private <UpdateT, CellT extends MetricCell<UpdateT>>
+  ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName, CellT> cells) {
+    ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
+    for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+      if (cell.getValue().getDirty().beforeCommit()) {
+        updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()),
+            cell.getValue().getCumulative()));
+      }
+    }
+    return updates.build();
+  }
+
+  /**
+   * Return the cumulative values for any metrics that have changed since the last time updates were
+   * committed.
+   */
+  public MetricUpdates getUpdates() {
+    return MetricUpdates.create(
+        extractUpdates(counters),
+        extractUpdates(distributions),
+        extractUpdates(gauges));
+  }
+
+  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> cells) {
+    for (MetricCell<?> cell : cells.values()) {
+      cell.getDirty().afterCommit();
+    }
+  }
+
+  /**
+   * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as
+   * committed.
+   */
+  public void commitUpdates() {
+    commitUpdates(counters);
+    commitUpdates(distributions);
+    commitUpdates(gauges);
+  }
+
+  private <UserT extends Metric, UpdateT, CellT extends MetricCell<UpdateT>>
+  ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
+      MetricsMap<MetricName, CellT> cells) {
+    ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
+    for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+      UpdateT update = checkNotNull(cell.getValue().getCumulative());
+      updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update));
+    }
+    return updates.build();
+  }
+
+  /**
+   * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this
+   * container.
+   */
+  public MetricUpdates getCumulative() {
+    return MetricUpdates.create(
+        extractCumulatives(counters),
+        extractCumulatives(distributions),
+        extractCumulatives(gauges));
+  }
+
+  /**
+   * Update values of this {@link MetricsContainerImpl} by merging the value of another cell.
+   */
+  public void update(MetricsContainerImpl other) {
+    updateCounters(counters, other.counters);
+    updateDistributions(distributions, other.distributions);
+    updateGauges(gauges, other.gauges);
+  }
+
+  private void updateCounters(
+      MetricsMap<MetricName, CounterCell> current,
+      MetricsMap<MetricName, CounterCell> updates) {
+    for (Map.Entry<MetricName, CounterCell> counter : updates.entries()) {
+      current.get(counter.getKey()).inc(counter.getValue().getCumulative());
+    }
+  }
+
+  private void updateDistributions(
+      MetricsMap<MetricName, DistributionCell> current,
+      MetricsMap<MetricName, DistributionCell> updates) {
+    for (Map.Entry<MetricName, DistributionCell> counter : updates.entries()) {
+      current.get(counter.getKey()).update(counter.getValue().getCumulative());
+    }
+  }
+
+  private void updateGauges(
+      MetricsMap<MetricName, GaugeCell> current,
+      MetricsMap<MetricName, GaugeCell> updates) {
+    for (Map.Entry<MetricName, GaugeCell> counter : updates.entries()) {
+      current.get(counter.getKey()).update(counter.getValue().getCumulative());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
new file mode 100644
index 0000000..68238e4
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Metrics containers by step.
+ *
+ * <p>This class is not thread-safe.</p>
+ */
+public class MetricsContainerStepMap implements Serializable {
+  private Map<String, MetricsContainerImpl> metricsContainers;
+
+  public MetricsContainerStepMap() {
+    this.metricsContainers = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns the container for the given step name.
+   */
+  public MetricsContainerImpl getContainer(String stepName) {
+    if (!metricsContainers.containsKey(stepName)) {
+      metricsContainers.put(stepName, new MetricsContainerImpl(stepName));
+    }
+    return metricsContainers.get(stepName);
+  }
+
+  /**
+   * Update this {@link MetricsContainerStepMap} with all values from given
+   * {@link MetricsContainerStepMap}.
+   */
+  public void updateAll(MetricsContainerStepMap other) {
+    for (Map.Entry<String, MetricsContainerImpl> container : other.metricsContainers.entrySet()) {
+      getContainer(container.getKey()).update(container.getValue());
+    }
+  }
+
+  /**
+   * Update {@link MetricsContainerImpl} for given step in this map with all values from given
+   * {@link MetricsContainerImpl}.
+   */
+  public void update(String step, MetricsContainerImpl container) {
+    getContainer(step).update(container);
+  }
+
+  /**
+   * Returns {@link MetricResults} based on given
+   * {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and committed metrics.
+   *
+   * <p>This constructor is intended for runners which support both attempted and committed
+   * metrics.
+   */
+  public static MetricResults asMetricResults(
+      MetricsContainerStepMap attemptedMetricsContainers,
+      MetricsContainerStepMap committedMetricsContainers) {
+    return new MetricsContainerStepMapMetricResults(
+        attemptedMetricsContainers,
+        committedMetricsContainers);
+  }
+
+  /**
+   * Returns {@link MetricResults} based on given {@link MetricsContainerStepMap} of attempted
+   * metrics.
+   *
+   * <p>This constructor is intended for runners which only support `attempted` metrics.
+   * Accessing {@link MetricResult#committed()} in the resulting {@link MetricResults} will result
+   * in an {@link UnsupportedOperationException}.</p>
+   */
+  public static MetricResults asAttemptedOnlyMetricResults(
+      MetricsContainerStepMap attemptedMetricsContainers) {
+    return new MetricsContainerStepMapMetricResults(attemptedMetricsContainers);
+  }
+
+  private Map<String, MetricsContainerImpl> getMetricsContainers() {
+    return metricsContainers;
+  }
+
+  private static class MetricsContainerStepMapMetricResults extends MetricResults {
+    private final Map<MetricKey, AttemptedAndCommitted<Long>> counters = new HashMap<>();
+    private final Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions =
+        new HashMap<>();
+    private final Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges = new HashMap<>();
+    private final boolean isCommittedSupported;
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers) {
+      this(attemptedMetricsContainers, new MetricsContainerStepMap(), false);
+    }
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers,
+        MetricsContainerStepMap committedMetricsContainers) {
+      this(attemptedMetricsContainers, committedMetricsContainers, true);
+    }
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers,
+        MetricsContainerStepMap committedMetricsContainers,
+        boolean isCommittedSupported) {
+      for (MetricsContainerImpl container
+          : attemptedMetricsContainers.getMetricsContainers().values()) {
+        MetricUpdates cumulative = container.getCumulative();
+        mergeCounters(counters, cumulative.counterUpdates(), attemptedCounterUpdateFn());
+        mergeDistributions(distributions, cumulative.distributionUpdates(),
+            attemptedDistributionUpdateFn());
+        mergeGauges(gauges, cumulative.gaugeUpdates(), attemptedGaugeUpdateFn());
+      }
+      for (MetricsContainerImpl container
+          : committedMetricsContainers.getMetricsContainers().values()) {
+        MetricUpdates cumulative = container.getCumulative();
+        mergeCounters(counters, cumulative.counterUpdates(), committedCounterUpdateFn());
+        mergeDistributions(distributions, cumulative.distributionUpdates(),
+            committedDistributionUpdateFn());
+        mergeGauges(gauges, cumulative.gaugeUpdates(), committedGaugeUpdateFn());
+      }
+      this.isCommittedSupported = isCommittedSupported;
+    }
+
+    private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+    attemptedDistributionUpdateFn() {
+      return new Function<MetricUpdate<DistributionData>,
+          AttemptedAndCommitted<DistributionData>>() {
+        @Override
+        public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, DistributionData.EMPTY));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+    committedDistributionUpdateFn() {
+      return new Function<MetricUpdate<DistributionData>,
+          AttemptedAndCommitted<DistributionData>>() {
+        @Override
+        public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, DistributionData.EMPTY),
+              input);
+        }
+      };
+    }
+
+    private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+    attemptedGaugeUpdateFn() {
+      return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() {
+        @Override
+        public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, GaugeData.empty()));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+    committedGaugeUpdateFn() {
+      return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() {
+        @Override
+        public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, GaugeData.empty()),
+              input);
+        }
+      };
+    }
+
+    private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> attemptedCounterUpdateFn() {
+      return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+        @Override
+        public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, 0L));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> committedCounterUpdateFn() {
+      return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+        @Override
+        public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, 0L),
+              input);
+        }
+      };
+    }
+
+    @Override
+    public MetricQueryResults queryMetrics(MetricsFilter filter) {
+      return new QueryResults(filter);
+    }
+
+    private class QueryResults implements MetricQueryResults {
+      private final MetricsFilter filter;
+
+      private QueryResults(MetricsFilter filter) {
+        this.filter = filter;
+      }
+
+      @Override
+      public Iterable<MetricResult<Long>> counters() {
+        return
+            FluentIterable
+                .from(counters.values())
+                .filter(matchesFilter(filter))
+                .transform(counterUpdateToResult())
+                .toList();
+      }
+
+      @Override
+      public Iterable<MetricResult<DistributionResult>> distributions() {
+        return
+            FluentIterable
+                .from(distributions.values())
+                .filter(matchesFilter(filter))
+                .transform(distributionUpdateToResult())
+                .toList();
+      }
+
+      @Override
+      public Iterable<MetricResult<GaugeResult>> gauges() {
+        return
+            FluentIterable
+                .from(gauges.values())
+                .filter(matchesFilter(filter))
+                .transform(gaugeUpdateToResult())
+                .toList();
+      }
+
+      private Predicate<AttemptedAndCommitted<?>> matchesFilter(final MetricsFilter filter) {
+        return new Predicate<AttemptedAndCommitted<?>>() {
+          @Override
+          public boolean apply(AttemptedAndCommitted<?> attemptedAndCommitted) {
+            return MetricFiltering.matches(filter, attemptedAndCommitted.getKey());
+          }
+        };
+      }
+    }
+
+    private Function<AttemptedAndCommitted<Long>, MetricResult<Long>> counterUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<Long>, MetricResult<Long>>() {
+            @Override
+            public MetricResult<Long>
+            apply(AttemptedAndCommitted<Long> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    private Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>>
+    distributionUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>>() {
+            @Override
+            public MetricResult<DistributionResult>
+            apply(AttemptedAndCommitted<DistributionData> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate().extractResult(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate().extractResult()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    private Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>>
+    gaugeUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>>() {
+            @Override
+            public MetricResult<GaugeResult>
+            apply(AttemptedAndCommitted<GaugeData> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate().extractResult(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate().extractResult()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeCounters(
+        Map<MetricKey, AttemptedAndCommitted<Long>> counters,
+        Iterable<MetricUpdate<Long>> updates,
+        Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<Long> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<Long> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (counters.containsKey(key)) {
+          AttemptedAndCommitted<Long> current = counters.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  update.getAttempted().getUpdate() + current.getAttempted().getUpdate()),
+              MetricUpdate.create(
+                  key,
+                  update.getCommitted().getUpdate() + current.getCommitted().getUpdate()));
+        }
+        counters.put(key, update);
+      }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeDistributions(
+        Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions,
+        Iterable<MetricUpdate<DistributionData>> updates,
+        Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+            updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<DistributionData> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<DistributionData> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (distributions.containsKey(key)) {
+          AttemptedAndCommitted<DistributionData> current = distributions.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+              MetricUpdate.create(
+                  key,
+                  update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+        }
+        distributions.put(key, update);
+      }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeGauges(
+        Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges,
+        Iterable<MetricUpdate<GaugeData>> updates,
+        Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+            updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<GaugeData> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<GaugeData> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (gauges.containsKey(key)) {
+          AttemptedAndCommitted<GaugeData> current = gauges.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+              MetricUpdate.create(
+                  key,
+                  update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+        }
+        gauges.put(key, update);
+      }
+    }
+
+    /**
+     * Accumulated implementation of {@link MetricResult}.
+     */
+    private static class AccumulatedMetricResult<T> implements MetricResult<T> {
+      private final MetricName name;
+      private final String step;
+      private final T attempted;
+      private final T committed;
+      private final boolean isCommittedSupported;
+
+      private AccumulatedMetricResult(
+          MetricName name,
+          String step,
+          T attempted,
+          T committed,
+          boolean isCommittedSupported) {
+        this.name = name;
+        this.step = step;
+        this.attempted = attempted;
+        this.committed = committed;
+        this.isCommittedSupported = isCommittedSupported;
+      }
+
+      @Override
+      public MetricName name() {
+        return name;
+      }
+
+      @Override
+      public String step() {
+        return step;
+      }
+
+      @Override
+      public T committed() {
+        if (!isCommittedSupported) {
+          throw new UnsupportedOperationException("This runner does not currently support committed"
+              + " metrics results. Please use 'attempted' instead.");
+        }
+        return committed;
+      }
+
+      @Override
+      public T attempted() {
+        return attempted;
+      }
+    }
+
+    /**
+     * Attempted and committed {@link MetricUpdate MetricUpdates}.
+     */
+    private static class AttemptedAndCommitted<T> {
+      private final MetricKey key;
+      private final MetricUpdate<T> attempted;
+      private final MetricUpdate<T> committed;
+
+      private AttemptedAndCommitted(MetricKey key, MetricUpdate<T> attempted,
+          MetricUpdate<T> committed) {
+        this.key = key;
+        this.attempted = attempted;
+        this.committed = committed;
+      }
+
+      private MetricKey getKey() {
+        return key;
+      }
+
+      private MetricUpdate<T> getAttempted() {
+        return attempted;
+      }
+
+      private MetricUpdate<T> getCommitted() {
+        return committed;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
new file mode 100644
index 0000000..9f08076
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A map from {@code K} to {@code T} that supports getting or creating values associated with a key
+ * in a thread-safe manner.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsMap<K, T> implements Serializable {
+
+  /** Interface for creating instances to populate the {@link MetricsMap}. */
+  public interface Factory<K, T> extends Serializable {
+    /**
+     * Create an instance of {@code T} to use with the given {@code key}.
+     *
+     * <p>It must be safe to call this from multiple threads.
+     */
+    T createInstance(K key);
+  }
+
+  private final Factory<K, T> factory;
+  private final ConcurrentMap<K, T> metrics = new ConcurrentHashMap<>();
+
+  public MetricsMap(Factory<K, T> factory) {
+    this.factory = factory;
+  }
+
+  /**
+   * Get or create the value associated with the given key.
+   */
+  public T get(K key) {
+    T metric = metrics.get(key);
+    if (metric == null) {
+      metric = factory.createInstance(key);
+      metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric), metric);
+    }
+    return metric;
+  }
+
+  /**
+   * Get the value associated with the given key, if it exists.
+   */
+  @Nullable
+  public T tryGet(K key) {
+    return metrics.get(key);
+  }
+
+  /**
+   * Return an iterable over the entries in the current {@link  MetricsMap}.
+   */
+  public Iterable<Map.Entry<K, T>> entries() {
+    return Iterables.unmodifiableIterable(metrics.entrySet());
+  }
+
+  /**
+   * Return an iterable over the values in the current {@link MetricsMap}.
+   */
+  public Iterable<T> values() {
+    return Iterables.unmodifiableIterable(metrics.values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
new file mode 100644
index 0000000..263a705
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities for runners to implement metrics.
+ */
+package org.apache.beam.runners.core.metrics;

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 88c6ab6..9ee79d7 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -26,8 +26,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import org.apache.beam.runners.core.LateDataDroppingDoFnRunner.LateDataFilter;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -58,7 +58,8 @@ public class LateDataDroppingDoFnRunnerTest {
 
   @Test
   public void testLateDataFilter() throws Exception {
-    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
     when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(15L));
 
     LateDataFilter lateDataFilter = new LateDataFilter(
@@ -77,14 +78,14 @@ public class LateDataDroppingDoFnRunnerTest {
         createDatum(16, 16L),
         createDatum(18, 18L));
     assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
-    long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+    long droppedValues = container.getCounter(
         MetricName.named(LateDataDroppingDoFnRunner.class,
             LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS))
         .getCumulative().longValue();
     assertEquals(1, droppedValues);
     // Ensure that reiterating returns the same results and doesn't increment the counter again.
     assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
-    droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+    droppedValues = container.getCounter(
         MetricName.named(LateDataDroppingDoFnRunner.class,
             LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS))
         .getCumulative().longValue();


Mime
View raw message