beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/5] beam git commit: [BEAM-2244] Move details of Metrics to Runners Core
Date Thu, 11 May 2017 00:01:46 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 e08cac055 -> 7485583a3


http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
deleted file mode 100644
index a0dd119..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import java.util.Objects;
-import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-
-/**
- * Matchers for metrics.
- */
-public class MetricMatchers {
-
-  /**
-   * Matches a {@link MetricUpdate} with the given name and contents.
-   *
-   * <p>Visible since it may be used in runner-specific tests.
-   */
-  public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String
name, final T update) {
-    return new TypeSafeMatcher<MetricUpdate<T>>() {
-      @Override
-      protected boolean matchesSafely(MetricUpdate<T> item) {
-        return Objects.equals(name, item.getKey().metricName().name())
-            && Objects.equals(update, item.getUpdate());
-      }
-
-      @Override
-      public void describeTo(Description description) {
-        description
-            .appendText("MetricUpdate{name=").appendValue(name)
-            .appendText(", update=").appendValue(update)
-            .appendText("}");
-      }
-    };
-  }
-
-  /**
-   * Matches a {@link MetricUpdate} with the given namespace, name, step and contents.
-   *
-   * <p>Visible since it may be used in runner-specific tests.
-   */
-  public static <T> Matcher<MetricUpdate<T>> metricUpdate(
-      final String namespace, final String name, final String step, final T update) {
-    return new TypeSafeMatcher<MetricUpdate<T>>() {
-      @Override
-      protected boolean matchesSafely(MetricUpdate<T> item) {
-        return Objects.equals(namespace, item.getKey().metricName().namespace())
-            && Objects.equals(name, item.getKey().metricName().name())
-            && Objects.equals(step, item.getKey().stepName())
-            && Objects.equals(update, item.getUpdate());
-      }
-
-      @Override
-      public void describeTo(Description description) {
-        description
-            .appendText("MetricUpdate{inNamespace=").appendValue(namespace)
-            .appendText(", name=").appendValue(name)
-            .appendText(", step=").appendValue(step)
-            .appendText(", update=").appendValue(update)
-            .appendText("}");
-      }
-    };
-  }
-
-  /**
-   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value
equals
-   * the given value for attempted metrics.
-   */
-  public static <T> Matcher<MetricResult<T>> attemptedMetricsResult(
-      final String namespace, final String name, final String step, final T value) {
-    return metricsResult(namespace, name, step, value, false);
-  }
-
-  /**
-   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value
equals
-   * the given value for committed metrics.
-   */
-  public static <T> Matcher<MetricResult<T>> committedMetricsResult(
-      final String namespace, final String name, final String step, final T value) {
-    return metricsResult(namespace, name, step, value, true);
-  }
-
-  /**
-   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value
equals
-   * the given value for either committed or attempted (based on {@code isCommitted}) metrics.
-   */
-  public static <T> Matcher<MetricResult<T>> metricsResult(
-      final String namespace, final String name, final String step, final T value,
-      final boolean isCommitted) {
-    final String metricState = isCommitted ? "committed" : "attempted";
-    return new TypeSafeMatcher<MetricResult<T>>() {
-      @Override
-      protected boolean matchesSafely(MetricResult<T> item) {
-        final T metricValue = isCommitted ? item.committed() : item.attempted();
-        return Objects.equals(namespace, item.name().namespace())
-            && Objects.equals(name, item.name().name())
-            && item.step().contains(step)
-            && metricResultsEqual(value, metricValue);
-      }
-
-      @Override
-      public void describeTo(Description description) {
-        description
-            .appendText("MetricResult{inNamespace=").appendValue(namespace)
-            .appendText(", name=").appendValue(name)
-            .appendText(", step=").appendValue(step)
-            .appendText(String.format(", %s=", metricState)).appendValue(value)
-            .appendText("}");
-      }
-
-      @Override
-      protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription)
{
-        mismatchDescription.appendText("MetricResult{");
-        final T metricValue = isCommitted ? item.committed() : item.attempted();
-
-        describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name,
step);
-
-        if (!Objects.equals(value, metricValue)) {
-          mismatchDescription
-              .appendText(String.format("%s: ", metricState)).appendValue(value)
-              .appendText(" != ").appendValue(metricValue);
-        }
-
-        mismatchDescription.appendText("}");
-      }
-    };
-  }
-
-  private static <T> boolean metricResultsEqual(T result1, T result2) {
-    if (result1 instanceof GaugeResult) {
-      return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value());
-    } else {
-      return result1.equals(result2);
-    }
-  }
-
-  static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax(
-      final String namespace, final String name, final String step,
-      final Long attemptedMin, final Long attemptedMax) {
-    return distributionMinMax(namespace, name, step, attemptedMin, attemptedMax, false);
-  }
-
-  static Matcher<MetricResult<DistributionResult>> distributionCommittedMinMax(
-      final String namespace, final String name, final String step,
-      final Long committedMin, final Long committedMax) {
-    return distributionMinMax(namespace, name, step, committedMin, committedMax, true);
-  }
-
-  static Matcher<MetricResult<DistributionResult>> distributionMinMax(
-      final String namespace, final String name, final String step,
-      final Long min, final Long max, final boolean isCommitted) {
-    final String metricState = isCommitted ? "committed" : "attempted";
-    return new TypeSafeMatcher<MetricResult<DistributionResult>>() {
-      @Override
-      protected boolean matchesSafely(MetricResult<DistributionResult> item) {
-        DistributionResult metricValue = isCommitted ? item.committed() : item.attempted();
-        return Objects.equals(namespace, item.name().namespace())
-            && Objects.equals(name, item.name().name())
-            && item.step().contains(step)
-            && Objects.equals(min, metricValue.min())
-            && Objects.equals(max, metricValue.max());
-      }
-
-      @Override
-      public void describeTo(Description description) {
-        description
-            .appendText("MetricResult{inNamespace=").appendValue(namespace)
-            .appendText(", name=").appendValue(name)
-            .appendText(", step=").appendValue(step)
-            .appendText(String.format(", %sMin=", metricState)).appendValue(min)
-            .appendText(String.format(", %sMax=", metricState)).appendValue(max)
-            .appendText("}");
-      }
-
-      @Override
-      protected void describeMismatchSafely(MetricResult<DistributionResult> item,
-          Description mismatchDescription) {
-        mismatchDescription.appendText("MetricResult{");
-
-        describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name,
step);
-        DistributionResult metricValue = isCommitted ? item.committed() : item.attempted();
-
-        if (!Objects.equals(min, metricValue.min())) {
-          mismatchDescription
-              .appendText(String.format("%sMin: ", metricState)).appendValue(min)
-              .appendText(" != ").appendValue(metricValue.min());
-        }
-
-        if (!Objects.equals(max, metricValue.max())) {
-          mismatchDescription
-              .appendText(String.format("%sMax: ", metricState)).appendValue(max)
-              .appendText(" != ").appendValue(metricValue.max());
-        }
-
-        mismatchDescription.appendText("}");
-      }
-    };
-  }
-
-  private static <T> void describeMetricsResultMembersMismatch(
-      MetricResult<T> item,
-      Description mismatchDescription,
-      String namespace,
-      String name,
-      String step) {
-    if (!Objects.equals(namespace, item.name().namespace())) {
-      mismatchDescription
-          .appendText("inNamespace: ").appendValue(namespace)
-          .appendText(" != ").appendValue(item.name().namespace());
-    }
-
-    if (!Objects.equals(name, item.name().name())) {
-      mismatchDescription
-          .appendText("name: ").appendValue(name)
-          .appendText(" != ").appendValue(item.name().name());
-    }
-
-    if (!item.step().contains(step)) {
-      mismatchDescription
-          .appendText("step: ").appendValue(step)
-          .appendText(" != ").appendValue(item.step());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
new file mode 100644
index 0000000..5031952
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import java.util.Objects;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matchers for {@link MetricResults}.
+ */
+public class MetricResultsMatchers {
+
+  /**
+   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value
equals
+   * the given value for attempted metrics.
+   */
+  public static <T> Matcher<MetricResult<T>> attemptedMetricsResult(
+      final String namespace, final String name, final String step, final T value) {
+    return metricsResult(namespace, name, step, value, false);
+  }
+
+  /**
+   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value
equals
+   * the given value for committed metrics.
+   */
+  public static <T> Matcher<MetricResult<T>> committedMetricsResult(
+      final String namespace, final String name, final String step, final T value) {
+    return metricsResult(namespace, name, step, value, true);
+  }
+
+  /**
+   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value
equals
+   * the given value for either committed or attempted (based on {@code isCommitted}) metrics.
+   */
+  public static <T> Matcher<MetricResult<T>> metricsResult(
+      final String namespace, final String name, final String step, final T value,
+      final boolean isCommitted) {
+    final String metricState = isCommitted ? "committed" : "attempted";
+    return new TypeSafeMatcher<MetricResult<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricResult<T> item) {
+        final T metricValue = isCommitted ? item.committed() : item.attempted();
+        return Objects.equals(namespace, item.name().namespace())
+            && Objects.equals(name, item.name().name())
+            && item.step().contains(step)
+            && metricResultsEqual(value, metricValue);
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricResult{inNamespace=").appendValue(namespace)
+            .appendText(", name=").appendValue(name)
+            .appendText(", step=").appendValue(step)
+            .appendText(String.format(", %s=", metricState)).appendValue(value)
+            .appendText("}");
+      }
+
+      @Override
+      protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription)
{
+        mismatchDescription.appendText("MetricResult{");
+        final T metricValue = isCommitted ? item.committed() : item.attempted();
+
+        describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name,
step);
+
+        if (!Objects.equals(value, metricValue)) {
+          mismatchDescription
+              .appendText(String.format("%s: ", metricState)).appendValue(value)
+              .appendText(" != ").appendValue(metricValue);
+        }
+
+        mismatchDescription.appendText("}");
+      }
+    };
+  }
+
+  private static <T> boolean metricResultsEqual(T result1, T result2) {
+    if (result1 instanceof GaugeResult) {
+      return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value());
+    } else {
+      return result1.equals(result2);
+    }
+  }
+
+  static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax(
+      final String namespace, final String name, final String step,
+      final Long attemptedMin, final Long attemptedMax) {
+    return distributionMinMax(namespace, name, step, attemptedMin, attemptedMax, false);
+  }
+
+  static Matcher<MetricResult<DistributionResult>> distributionCommittedMinMax(
+      final String namespace, final String name, final String step,
+      final Long committedMin, final Long committedMax) {
+    return distributionMinMax(namespace, name, step, committedMin, committedMax, true);
+  }
+
+  public static Matcher<MetricResult<DistributionResult>> distributionMinMax(
+      final String namespace, final String name, final String step,
+      final Long min, final Long max, final boolean isCommitted) {
+    final String metricState = isCommitted ? "committed" : "attempted";
+    return new TypeSafeMatcher<MetricResult<DistributionResult>>() {
+      @Override
+      protected boolean matchesSafely(MetricResult<DistributionResult> item) {
+        DistributionResult metricValue = isCommitted ? item.committed() : item.attempted();
+        return Objects.equals(namespace, item.name().namespace())
+            && Objects.equals(name, item.name().name())
+            && item.step().contains(step)
+            && Objects.equals(min, metricValue.min())
+            && Objects.equals(max, metricValue.max());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricResult{inNamespace=").appendValue(namespace)
+            .appendText(", name=").appendValue(name)
+            .appendText(", step=").appendValue(step)
+            .appendText(String.format(", %sMin=", metricState)).appendValue(min)
+            .appendText(String.format(", %sMax=", metricState)).appendValue(max)
+            .appendText("}");
+      }
+
+      @Override
+      protected void describeMismatchSafely(MetricResult<DistributionResult> item,
+          Description mismatchDescription) {
+        mismatchDescription.appendText("MetricResult{");
+
+        describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name,
step);
+        DistributionResult metricValue = isCommitted ? item.committed() : item.attempted();
+
+        if (!Objects.equals(min, metricValue.min())) {
+          mismatchDescription
+              .appendText(String.format("%sMin: ", metricState)).appendValue(min)
+              .appendText(" != ").appendValue(metricValue.min());
+        }
+
+        if (!Objects.equals(max, metricValue.max())) {
+          mismatchDescription
+              .appendText(String.format("%sMax: ", metricState)).appendValue(max)
+              .appendText(" != ").appendValue(metricValue.max());
+        }
+
+        mismatchDescription.appendText("}");
+      }
+    };
+  }
+
+  private static <T> void describeMetricsResultMembersMismatch(
+      MetricResult<T> item,
+      Description mismatchDescription,
+      String namespace,
+      String name,
+      String step) {
+    if (!Objects.equals(namespace, item.name().namespace())) {
+      mismatchDescription
+          .appendText("inNamespace: ").appendValue(namespace)
+          .appendText(" != ").appendValue(item.name().namespace());
+    }
+
+    if (!Objects.equals(name, item.name().name())) {
+      mismatchDescription
+          .appendText("name: ").appendValue(name)
+          .appendText(" != ").appendValue(item.name().name());
+    }
+
+    if (!item.step().contains(step)) {
+      mismatchDescription
+          .appendText("step: ").appendValue(step)
+          .appendText(" != ").appendValue(item.step());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
deleted file mode 100644
index 0428ce1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult;
-import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
-import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asMetricResults;
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertThat;
-
-import java.io.Closeable;
-import java.io.IOException;
-import org.hamcrest.collection.IsIterableWithSize;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-/**
- * Tests for {@link MetricsContainerStepMap}.
- */
-public class MetricsContainerStepMapTest {
-
-  private static final String NAMESPACE = MetricsContainerStepMapTest.class.getName();
-  private static final String STEP1 = "myStep1";
-  private static final String STEP2 = "myStep2";
-
-  private static final long VALUE = 100;
-
-  private static final Counter counter =
-      Metrics.counter(
-          MetricsContainerStepMapTest.class,
-          "myCounter");
-  private static final Distribution distribution =
-      Metrics.distribution(
-          MetricsContainerStepMapTest.class,
-          "myDistribution");
-  private static final Gauge gauge =
-      Metrics.gauge(
-          MetricsContainerStepMapTest.class,
-          "myGauge");
-
-  private static final MetricsContainer metricsContainer;
-
-  static {
-    metricsContainer = new MetricsContainer(null);
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer))
{
-      counter.inc(VALUE);
-      distribution.update(VALUE);
-      distribution.update(VALUE * 2);
-      gauge.set(VALUE);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testAttemptedAccumulatedMetricResults() {
-    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
-    attemptedMetrics.update(STEP1, metricsContainer);
-    attemptedMetrics.update(STEP2, metricsContainer);
-    attemptedMetrics.update(STEP2, metricsContainer);
-
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(attemptedMetrics);
-
-    MetricQueryResults step1res =
-        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
-
-    assertIterableSize(step1res.counters(), 1);
-    assertIterableSize(step1res.distributions(), 1);
-    assertIterableSize(step1res.gauges(), 1);
-
-    assertCounter(step1res, STEP1, VALUE, false);
-    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE
* 2),
-        false);
-    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
-
-    MetricQueryResults step2res =
-        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
-
-    assertIterableSize(step2res.counters(), 1);
-    assertIterableSize(step2res.distributions(), 1);
-    assertIterableSize(step2res.gauges(), 1);
-
-    assertCounter(step2res, STEP2, VALUE * 2, false);
-    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE
* 2),
-        false);
-    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
-
-    MetricQueryResults allres =
-        metricResults.queryMetrics(MetricsFilter.builder().build());
-
-    assertIterableSize(allres.counters(), 2);
-    assertIterableSize(allres.distributions(), 2);
-    assertIterableSize(allres.gauges(), 2);
-  }
-
-  @Test
-  public void testCounterCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
-    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
-    attemptedMetrics.update(STEP1, metricsContainer);
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(attemptedMetrics);
-
-    MetricQueryResults step1res =
-        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
-
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("This runner does not currently support committed metrics results.");
-
-    assertCounter(step1res, STEP1, VALUE, true);
-  }
-
-  @Test
-  public void testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
-    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
-    attemptedMetrics.update(STEP1, metricsContainer);
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(attemptedMetrics);
-
-    MetricQueryResults step1res =
-        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
-
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("This runner does not currently support committed metrics results.");
-
-    assertDistribution(step1res, STEP1, DistributionResult.ZERO, true);
-  }
-
-  @Test
-  public void testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
-    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
-    attemptedMetrics.update(STEP1, metricsContainer);
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(attemptedMetrics);
-
-    MetricQueryResults step1res =
-        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
-
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("This runner does not currently support committed metrics results.");
-
-    assertGauge(step1res, STEP1, GaugeResult.empty(), true);
-  }
-
-  @Test
-  public void testAttemptedAndCommittedAccumulatedMetricResults() {
-    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
-    attemptedMetrics.update(STEP1, metricsContainer);
-    attemptedMetrics.update(STEP1, metricsContainer);
-    attemptedMetrics.update(STEP2, metricsContainer);
-    attemptedMetrics.update(STEP2, metricsContainer);
-    attemptedMetrics.update(STEP2, metricsContainer);
-
-    MetricsContainerStepMap committedMetrics = new MetricsContainerStepMap();
-    committedMetrics.update(STEP1, metricsContainer);
-    committedMetrics.update(STEP2, metricsContainer);
-    committedMetrics.update(STEP2, metricsContainer);
-
-    MetricResults metricResults =
-        asMetricResults(attemptedMetrics, committedMetrics);
-
-    MetricQueryResults step1res =
-        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
-
-    assertIterableSize(step1res.counters(), 1);
-    assertIterableSize(step1res.distributions(), 1);
-    assertIterableSize(step1res.gauges(), 1);
-
-    assertCounter(step1res, STEP1, VALUE * 2, false);
-    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE
* 2),
-        false);
-    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
-
-    assertCounter(step1res, STEP1, VALUE, true);
-    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE
* 2),
-        true);
-    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true);
-
-    MetricQueryResults step2res =
-        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
-
-    assertIterableSize(step2res.counters(), 1);
-    assertIterableSize(step2res.distributions(), 1);
-    assertIterableSize(step2res.gauges(), 1);
-
-    assertCounter(step2res, STEP2, VALUE * 3, false);
-    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 9, 6, VALUE, VALUE
* 2),
-        false);
-    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
-
-    assertCounter(step2res, STEP2, VALUE * 2, true);
-    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE
* 2),
-        true);
-    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true);
-
-    MetricQueryResults allres =
-        metricResults.queryMetrics(MetricsFilter.builder().build());
-
-    assertIterableSize(allres.counters(), 2);
-    assertIterableSize(allres.distributions(), 2);
-    assertIterableSize(allres.gauges(), 2);
-  }
-
-  private <T> void assertIterableSize(Iterable<T> iterable, int size) {
-    assertThat(iterable, IsIterableWithSize.<T>iterableWithSize(size));
-  }
-
-  private void assertCounter(
-      MetricQueryResults metricQueryResults,
-      String step,
-      Long expected,
-      boolean isCommitted) {
-    assertThat(
-        metricQueryResults.counters(),
-        hasItem(metricsResult(NAMESPACE, counter.getName().name(), step, expected, isCommitted)));
-  }
-
-  private void assertDistribution(
-      MetricQueryResults metricQueryResults,
-      String step,
-      DistributionResult expected,
-      boolean isCommitted) {
-    assertThat(
-        metricQueryResults.distributions(),
-        hasItem(metricsResult(NAMESPACE, distribution.getName().name(), step, expected,
-            isCommitted)));
-  }
-
-  private void assertGauge(
-      MetricQueryResults metricQueryResults,
-      String step,
-      GaugeResult expected,
-      boolean isCommitted) {
-    assertThat(
-        metricQueryResults.gauges(),
-        hasItem(metricsResult(NAMESPACE, gauge.getName().name(), step, expected, isCommitted)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
deleted file mode 100644
index 38c00d3..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link MetricsContainer}.
- */
-@RunWith(JUnit4.class)
-public class MetricsContainerTest {
-
-  @Test
-  public void testCounterDeltas() {
-    MetricsContainer container = new MetricsContainer("step1");
-    CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
-    CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
-    assertThat("All counters should start out dirty",
-        container.getUpdates().counterUpdates(), containsInAnyOrder(
-        metricUpdate("name1", 0L),
-        metricUpdate("name2", 0L)));
-    container.commitUpdates();
-    assertThat("After commit no counters should be dirty",
-        container.getUpdates().counterUpdates(), emptyIterable());
-
-    c1.update(5L);
-    c2.update(4L);
-
-    assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder(
-        metricUpdate("name1", 5L),
-        metricUpdate("name2", 4L)));
-
-    assertThat("Since we haven't committed, updates are still included",
-        container.getUpdates().counterUpdates(), containsInAnyOrder(
-        metricUpdate("name1", 5L),
-        metricUpdate("name2", 4L)));
-
-    container.commitUpdates();
-    assertThat("After commit there are no updates",
-        container.getUpdates().counterUpdates(), emptyIterable());
-
-    c1.update(8L);
-    assertThat(container.getUpdates().counterUpdates(), contains(
-        metricUpdate("name1", 13L)));
-  }
-
-  @Test
-  public void testCounterCumulatives() {
-    MetricsContainer container = new MetricsContainer("step1");
-    CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
-    CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
-    c1.update(2L);
-    c2.update(4L);
-    c1.update(3L);
-
-    container.getUpdates();
-    container.commitUpdates();
-    assertThat("Committing updates shouldn't affect cumulative counter values",
-        container.getCumulative().counterUpdates(), containsInAnyOrder(
-        metricUpdate("name1", 5L),
-        metricUpdate("name2", 4L)));
-
-    c1.update(8L);
-    assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder(
-        metricUpdate("name1", 13L),
-        metricUpdate("name2", 4L)));
-  }
-
-  @Test
-  public void testDistributionDeltas() {
-    MetricsContainer container = new MetricsContainer("step1");
-    DistributionCell c1 = container.getDistribution(MetricName.named("ns", "name1"));
-    DistributionCell c2 = container.getDistribution(MetricName.named("ns", "name2"));
-
-    assertThat("Initial update includes initial zero-values",
-        container.getUpdates().distributionUpdates(), containsInAnyOrder(
-        metricUpdate("name1", DistributionData.EMPTY),
-        metricUpdate("name2", DistributionData.EMPTY)));
-
-    container.commitUpdates();
-    assertThat("No updates after commit",
-        container.getUpdates().distributionUpdates(), emptyIterable());
-
-    c1.update(5L);
-    c2.update(4L);
-
-    assertThat(container.getUpdates().distributionUpdates(), containsInAnyOrder(
-        metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
-        metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
-    assertThat("Updates stay the same without commit",
-        container.getUpdates().distributionUpdates(), containsInAnyOrder(
-        metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
-        metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
-
-    container.commitUpdates();
-    assertThat("No updatess after commit",
-        container.getUpdates().distributionUpdates(), emptyIterable());
-
-    c1.update(8L);
-    c1.update(4L);
-    assertThat(container.getUpdates().distributionUpdates(), contains(
-        metricUpdate("name1", DistributionData.create(17, 3, 4, 8))));
-    container.commitUpdates();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
index 0ce17b4..a29c13b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -18,15 +18,16 @@
 
 package org.apache.beam.sdk.metrics;
 
-import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate;
-import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
 
 /**
  * Tests for {@link MetricsEnvironment}.
@@ -41,8 +42,13 @@ public class MetricsEnvironmentTest {
   @Test
   public void testUsesAppropriateMetricsContainer() {
     Counter counter = Metrics.counter("ns", "name");
-    MetricsContainer c1 = new MetricsContainer("step1");
-    MetricsContainer c2 = new MetricsContainer("step2");
+
+    MetricsContainer c1 = Mockito.mock(MetricsContainer.class);
+    MetricsContainer c2 = Mockito.mock(MetricsContainer.class);
+    Counter counter1 = Mockito.mock(Counter.class);
+    Counter counter2 = Mockito.mock(Counter.class);
+    when(c1.getCounter(MetricName.named("ns", "name"))).thenReturn(counter1);
+    when(c2.getCounter(MetricName.named("ns", "name"))).thenReturn(counter2);
 
     MetricsEnvironment.setCurrentContainer(c1);
     counter.inc();
@@ -50,10 +56,9 @@ public class MetricsEnvironmentTest {
     counter.dec();
     MetricsEnvironment.setCurrentContainer(null);
 
-    MetricUpdates updates1 = c1.getUpdates();
-    MetricUpdates updates2 = c2.getUpdates();
-    assertThat(updates1.counterUpdates(), contains(metricUpdate("ns", "name", "step1", 1L)));
-    assertThat(updates2.counterUpdates(), contains(metricUpdate("ns", "name", "step2", -1L)));
+    verify(counter1).inc(1L);
+    verify(counter2).inc(-1L);
+    verifyNoMoreInteractions(counter1, counter2);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java
deleted file mode 100644
index 4104f8d..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.metrics;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.Assert.assertThat;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-/**
- * Tests for {@link MetricsMap}.
- */
-@RunWith(JUnit4.class)
-public class MetricsMapTest {
-
-  public MetricsMap<String, AtomicLong> metricsMap =
-      new MetricsMap<>(new MetricsMap.Factory<String, AtomicLong>() {
-    @Override
-    public AtomicLong createInstance(String unusedKey) {
-      return new AtomicLong();
-    }
-  });
-
-  @Test
-  public void testCreateSeparateInstances() {
-    AtomicLong foo = metricsMap.get("foo");
-    AtomicLong bar = metricsMap.get("bar");
-
-    assertThat(foo, not(sameInstance(bar)));
-  }
-
-  @Test
-  public void testReuseInstances() {
-    AtomicLong foo1 = metricsMap.get("foo");
-    AtomicLong foo2 = metricsMap.get("foo");
-
-    assertThat(foo1, sameInstance(foo2));
-  }
-
-  @Test
-  public void testGet() {
-    assertThat(metricsMap.tryGet("foo"), nullValue(AtomicLong.class));
-
-    AtomicLong foo = metricsMap.get("foo");
-    assertThat(metricsMap.tryGet("foo"), sameInstance(foo));
-  }
-
-  @Test
-  public void testGetEntries() {
-    AtomicLong foo = metricsMap.get("foo");
-    AtomicLong bar = metricsMap.get("bar");
-    assertThat(metricsMap.entries(), containsInAnyOrder(
-        hasEntry("foo", foo),
-        hasEntry("bar", bar)));
-  }
-
-  private static Matcher<Map.Entry<String, AtomicLong>> hasEntry(
-      final String key, final AtomicLong value) {
-    return new TypeSafeMatcher<Entry<String, AtomicLong>>() {
-
-      @Override
-      public void describeTo(Description description) {
-        description
-            .appendText("Map.Entry{key=").appendValue(key)
-            .appendText(", value=").appendValue(value)
-            .appendText("}");
-      }
-
-      @Override
-      protected boolean matchesSafely(Entry<String, AtomicLong> item) {
-        return Objects.equals(key, item.getKey())
-            && Objects.equals(value, item.getValue());
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 084c445..bc768f8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -18,13 +18,14 @@
 
 package org.apache.beam.sdk.metrics;
 
-import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
-import static org.apache.beam.sdk.metrics.MetricMatchers.distributionMinMax;
-import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult;
-import static org.hamcrest.Matchers.equalTo;
+import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
+import static org.apache.beam.sdk.metrics.MetricResultsMatchers.distributionMinMax;
+import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.PipelineResult;
@@ -41,13 +42,13 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-import org.hamcrest.CoreMatchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
 /**
  * Tests for {@link Metrics}.
@@ -95,40 +96,40 @@ public class MetricsTest implements Serializable {
 
   @Test
   public void testDistributionToCell() {
-    MetricsContainer container = new MetricsContainer("step");
-    MetricsEnvironment.setCurrentContainer(container);
+    MetricsContainer mockContainer = Mockito.mock(MetricsContainer.class);
+    Distribution mockDistribution = Mockito.mock(Distribution.class);
+    when(mockContainer.getDistribution(METRIC_NAME)).thenReturn(mockDistribution);
 
     Distribution distribution = Metrics.distribution(NS, NAME);
 
+    MetricsEnvironment.setCurrentContainer(mockContainer);
     distribution.update(5L);
 
-    DistributionCell cell = container.getDistribution(METRIC_NAME);
-    assertThat(cell.getCumulative(), equalTo(DistributionData.create(5, 1, 5, 5)));
+    verify(mockDistribution).update(5L);
 
     distribution.update(36L);
-    assertThat(cell.getCumulative(), equalTo(DistributionData.create(41, 2, 5, 36)));
-
     distribution.update(1L);
-    assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 1, 36)));
+    verify(mockDistribution).update(36L);
+    verify(mockDistribution).update(1L);
   }
 
   @Test
   public void testCounterToCell() {
-    MetricsContainer container = new MetricsContainer("step");
-    MetricsEnvironment.setCurrentContainer(container);
+    MetricsContainer mockContainer = Mockito.mock(MetricsContainer.class);
+    Counter mockCounter = Mockito.mock(Counter.class);
+    when(mockContainer.getCounter(METRIC_NAME)).thenReturn(mockCounter);
+
     Counter counter = Metrics.counter(NS, NAME);
-    CounterCell cell = container.getCounter(METRIC_NAME);
+
+    MetricsEnvironment.setCurrentContainer(mockContainer);
     counter.inc();
-    assertThat(cell.getCumulative(), CoreMatchers.equalTo(1L));
+    verify(mockCounter).inc(1);
 
     counter.inc(47L);
-    assertThat(cell.getCumulative(), CoreMatchers.equalTo(48L));
+    verify(mockCounter).inc(47);
 
     counter.dec(5L);
-    assertThat(cell.getCumulative(), CoreMatchers.equalTo(43L));
-
-    counter.dec();
-    assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L));
+    verify(mockCounter).inc(-5);
   }
 
   @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class,

http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 12b7c78..691f7f4 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
@@ -54,7 +55,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
 import org.apache.beam.sdk.metrics.GaugeResult;
-import org.apache.beam.sdk.metrics.MetricMatchers;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -587,33 +587,29 @@ public class KafkaIOTest {
 
     Iterable<MetricResult<Long>> counters = metrics.counters();
 
-    assertThat(counters, hasItem(
-        MetricMatchers.attemptedMetricsResult(
-            elementsRead.namespace(),
-            elementsRead.name(),
-            readStep,
-            1000L)));
-
-    assertThat(counters, hasItem(
-        MetricMatchers.attemptedMetricsResult(
-            elementsReadBySplit.namespace(),
-            elementsReadBySplit.name(),
-            readStep,
-            1000L)));
-
-    assertThat(counters, hasItem(
-        MetricMatchers.attemptedMetricsResult(
-            bytesRead.namespace(),
-            bytesRead.name(),
-            readStep,
-            12000L)));
-
-    assertThat(counters, hasItem(
-        MetricMatchers.attemptedMetricsResult(
-            bytesReadBySplit.namespace(),
-            bytesReadBySplit.name(),
-            readStep,
-            12000L)));
+    assertThat(counters, hasItem(attemptedMetricsResult(
+        elementsRead.namespace(),
+        elementsRead.name(),
+        readStep,
+        1000L)));
+
+    assertThat(counters, hasItem(attemptedMetricsResult(
+        elementsReadBySplit.namespace(),
+        elementsReadBySplit.name(),
+        readStep,
+        1000L)));
+
+    assertThat(counters, hasItem(attemptedMetricsResult(
+        bytesRead.namespace(),
+        bytesRead.name(),
+        readStep,
+        12000L)));
+
+    assertThat(counters, hasItem(attemptedMetricsResult(
+        bytesReadBySplit.namespace(),
+        bytesReadBySplit.name(),
+        readStep,
+        12000L)));
 
     MetricQueryResults backlogElementsMetrics =
         result.metrics().queryMetrics(
@@ -912,7 +908,7 @@ public class KafkaIOTest {
 
 
       assertThat(metrics.counters(), hasItem(
-          MetricMatchers.attemptedMetricsResult(
+          attemptedMetricsResult(
               elementsWritten.namespace(),
               elementsWritten.name(),
               "writeToKafka",


Mime
View raw message