Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A8C27200CF6 for ; Thu, 20 Jul 2017 21:53:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A742316BF81; Thu, 20 Jul 2017 19:53:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 17CD016BF6B for ; Thu, 20 Jul 2017 21:53:05 +0200 (CEST) Received: (qmail 36646 invoked by uid 500); 20 Jul 2017 19:52:59 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 35829 invoked by uid 99); 20 Jul 2017 19:52:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Jul 2017 19:52:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9A827F3289; Thu, 20 Jul 2017 19:52:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbonofre@apache.org To: commits@beam.apache.org Date: Thu, 20 Jul 2017 19:53:17 -0000 Message-Id: In-Reply-To: <8dad956546b246cd8a7bbc3979e4b420@git.apache.org> References: <8dad956546b246cd8a7bbc3979e4b420@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/50] [abbrv] beam git commit: [BEAM-2084] Adding querying facility for distribution metrics in Java archived-at: Thu, 20 Jul 2017 19:53:07 -0000 [BEAM-2084] Adding querying facility for distribution metrics in Java Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a48eefac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a48eefac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a48eefac Branch: refs/heads/DSL_SQL Commit: a48eeface8c5257f34e85c22f312ec03801b0f82 Parents: 7c36318 Author: Pablo Authored: Thu May 4 14:56:14 2017 -0700 Committer: Ben Chambers Committed: Tue Jul 18 09:58:47 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/examples/WordCount.java | 4 + pom.xml | 2 +- .../beam/runners/dataflow/DataflowMetrics.java | 310 +++++++++++++------ .../runners/dataflow/DataflowPipelineJob.java | 4 + .../runners/dataflow/DataflowMetricsTest.java | 174 ++++++++++- .../beam/sdk/metrics/MetricResultsMatchers.java | 2 +- 6 files changed, 388 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/examples/java/src/main/java/org/apache/beam/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index bfa7eb3..2d568ce 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -21,6 +21,7 @@ import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -88,9 +89,12 @@ public class WordCount { */ static class ExtractWordsFn extends DoFn { private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); + private final Distribution lineLenDist = Metrics.distribution( + ExtractWordsFn.class, "lineLenDistro"); @ProcessElement public void processElement(ProcessContext c) { + lineLenDist.update(c.element().length()); if (c.element().trim().isEmpty()) { emptyLines.inc(); } http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d9ab9ae..d27d367 100644 --- a/pom.xml +++ b/pom.xml @@ -112,7 +112,7 @@ v1-rev6-1.22.0 0.1.0 v2-rev8-1.22.0 - v1b3-rev196-1.22.0 + v1b3-rev198-1.20.0 0.5.160222 1.4.0 1.3.0 http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 330cc7e..4c9c493 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -19,7 +19,9 @@ package org.apache.beam.runners.dataflow; import static com.google.common.base.MoreObjects.firstNonNull; +import com.google.api.client.util.ArrayMap; import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricUpdate; import com.google.auto.value.AutoValue; import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; @@ -28,6 +30,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.runners.core.construction.metrics.MetricFiltering; import org.apache.beam.runners.core.construction.metrics.MetricKey; import org.apache.beam.sdk.metrics.DistributionResult; @@ -73,39 +76,6 @@ class DataflowMetrics extends MetricResults { } /** - * Build an immutable map that serves as a hash key for a metric update. - * @return a {@link MetricKey} that can be hashed and used to identify a metric. - */ - private MetricKey metricHashKey( - com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { - String fullStepName = metricUpdate.getName().getContext().get("step"); - if (dataflowPipelineJob.transformStepNames == null - || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) { - // If we can't translate internal step names to user step names, we just skip them - // altogether. - return null; - } - fullStepName = dataflowPipelineJob.transformStepNames - .inverse().get(fullStepName).getFullName(); - return MetricKey.create( - fullStepName, - MetricName.named( - metricUpdate.getName().getContext().get("namespace"), - metricUpdate.getName().getName())); - } - - /** - * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative - * update or not. - * @return true if update is tentative, false otherwise - */ - private boolean isMetricTentative( - com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { - return (metricUpdate.getName().getContext().containsKey("tentative") - && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true")); - } - - /** * Take a list of metric updates coming from the Dataflow service, and format it into a * Metrics API MetricQueryResults instance. * @param metricUpdates @@ -114,68 +84,8 @@ class DataflowMetrics extends MetricResults { private MetricQueryResults populateMetricQueryResults( List metricUpdates, MetricsFilter filter) { - // Separate metric updates by name and by tentative/committed. - HashMap - tentativeByName = new HashMap<>(); - HashMap - committedByName = new HashMap<>(); - HashSet metricHashKeys = new HashSet<>(); - - // If the Context of the metric update does not have a namespace, then these are not - // actual metrics counters. - for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) { - if (Objects.equal(update.getName().getOrigin(), "user") - && update.getName().getContext().containsKey("namespace")) { - MetricKey key = metricHashKey(update); - if (key == null) { - continue; - } - metricHashKeys.add(key); - if (isMetricTentative(update)) { - tentativeByName.put(key, update); - } else { - committedByName.put(key, update); - } - } - } - // Create the lists with the metric result information. - ImmutableList.Builder> counterResults = ImmutableList.builder(); - ImmutableList.Builder> distributionResults = - ImmutableList.builder(); - ImmutableList.Builder> gaugeResults = ImmutableList.builder(); - for (MetricKey metricKey : metricHashKeys) { - if (!MetricFiltering.matches(filter, metricKey)) { - // Skip unmatched metrics early. - continue; - } - - // This code is not robust to evolutions in the types of metrics that can be returned, so - // wrap it in a try-catch and log errors. - try { - String metricName = metricKey.metricName().name(); - if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]") - || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) { - // Skip distribution metrics, as these are not yet properly supported. - LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow" - + " User Interface"); - continue; - } - - String namespace = metricKey.metricName().namespace(); - String step = metricKey.stepName(); - Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue(); - Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue(); - counterResults.add( - DataflowMetricResult.create( - MetricName.named(namespace, metricName), step, committed, attempted)); - } catch (Exception e) { - LOG.warn("Error handling metric {} for filter {}, skipping result.", metricKey, filter); - } - } - return DataflowMetricQueryResults.create( - counterResults.build(), - distributionResults.build(), - gaugeResults.build()); + return DataflowMetricQueryResultsFactory.create(dataflowPipelineJob, metricUpdates, filter) + .build(); } private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) { @@ -214,6 +124,214 @@ class DataflowMetrics extends MetricResults { return result; } + private static class DataflowMetricResultExtractor { + private final ImmutableList.Builder> counterResults; + private final ImmutableList.Builder> distributionResults; + private final ImmutableList.Builder> gaugeResults; + private final boolean isStreamingJob; + + DataflowMetricResultExtractor(boolean isStreamingJob) { + counterResults = ImmutableList.builder(); + distributionResults = ImmutableList.builder(); + gaugeResults = ImmutableList.builder(); + this.isStreamingJob = isStreamingJob; + } + + public void addMetricResult( + MetricKey metricKey, + @Nullable com.google.api.services.dataflow.model.MetricUpdate committed, + @Nullable com.google.api.services.dataflow.model.MetricUpdate attempted) { + if (committed == null || attempted == null) { + LOG.warn( + "Metric {} did not have both a committed ({}) and tentative value ({}).", + metricKey, committed, attempted); + } else if (committed.getDistribution() != null && attempted.getDistribution() != null) { + // distribution metric + DistributionResult value = getDistributionValue(committed); + distributionResults.add( + DataflowMetricResult.create( + metricKey.metricName(), + metricKey.stepName(), + isStreamingJob ? null : value, // Committed + isStreamingJob ? value : null)); // Attempted + /* In Dataflow streaming jobs, only ATTEMPTED metrics are available. + * In Dataflow batch jobs, only COMMITTED metrics are available. + * Reporting the appropriate metric depending on whether it's a batch/streaming job. + */ + } else if (committed.getScalar() != null && attempted.getScalar() != null) { + // counter metric + Long value = getCounterValue(committed); + counterResults.add( + DataflowMetricResult.create( + metricKey.metricName(), + metricKey.stepName(), + isStreamingJob ? null : value, // Committed + isStreamingJob ? value : null)); // Attempted + /* In Dataflow streaming jobs, only ATTEMPTED metrics are available. + * In Dataflow batch jobs, only COMMITTED metrics are available. + * Reporting the appropriate metric depending on whether it's a batch/streaming job. + */ + } else { + // This is exceptionally unexpected. We expect matching user metrics to only have the + // value types provided by the Metrics API. + LOG.warn("Unexpected / mismatched metric types." + + " Please report JOB ID to Dataflow Support. Metric key: {}." + + " Committed / attempted Metric updates: {} / {}", + metricKey.toString(), committed.toString(), attempted.toString()); + } + } + + private Long getCounterValue(com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + if (metricUpdate.getScalar() == null) { + return 0L; + } + return ((Number) metricUpdate.getScalar()).longValue(); + } + + private DistributionResult getDistributionValue( + com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + if (metricUpdate.getDistribution() == null) { + return DistributionResult.ZERO; + } + ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution(); + Long count = ((Number) distributionMap.get("count")).longValue(); + Long min = ((Number) distributionMap.get("min")).longValue(); + Long max = ((Number) distributionMap.get("max")).longValue(); + Long sum = ((Number) distributionMap.get("sum")).longValue(); + return DistributionResult.create(sum, count, min, max); + } + + public Iterable> getDistributionResults() { + return distributionResults.build(); + } + + public Iterable> getCounterResults() { + return counterResults.build(); + } + + public Iterable> getGaugeResults() { + return gaugeResults.build(); + } + } + + private static class DataflowMetricQueryResultsFactory { + private final Iterable metricUpdates; + private final MetricsFilter filter; + private final HashMap + tentativeByName; + private final HashMap + committedByName; + private final HashSet metricHashKeys; + private final DataflowPipelineJob dataflowPipelineJob; + + public static DataflowMetricQueryResultsFactory create(DataflowPipelineJob dataflowPipelineJob, + Iterable metricUpdates, + MetricsFilter filter) { + return new DataflowMetricQueryResultsFactory(dataflowPipelineJob, metricUpdates, filter); + } + + private DataflowMetricQueryResultsFactory(DataflowPipelineJob dataflowPipelineJob, + Iterable metricUpdates, + MetricsFilter filter) { + this.dataflowPipelineJob = dataflowPipelineJob; + this.metricUpdates = metricUpdates; + this.filter = filter; + + tentativeByName = new HashMap<>(); + committedByName = new HashMap<>(); + metricHashKeys = new HashSet<>(); + } + + /** + * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative + * update or not. + * @return true if update is tentative, false otherwise + */ + private boolean isMetricTentative( + com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + return (metricUpdate.getName().getContext().containsKey("tentative") + && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true")); + } + + /** + * Build an {@link MetricKey} that serves as a hash key for a metric update. + * @return a {@link MetricKey} that can be hashed and used to identify a metric. + */ + private MetricKey getMetricHashKey( + com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + String fullStepName = metricUpdate.getName().getContext().get("step"); + if (dataflowPipelineJob.transformStepNames == null + || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) { + // If we can't translate internal step names to user step names, we just skip them + // altogether. + return null; + } + fullStepName = dataflowPipelineJob.transformStepNames + .inverse().get(fullStepName).getFullName(); + return MetricKey.create( + fullStepName, + MetricName.named( + metricUpdate.getName().getContext().get("namespace"), + metricUpdate.getName().getName())); + } + + private void buildMetricsIndex() { + // If the Context of the metric update does not have a namespace, then these are not + // actual metrics counters. + for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) { + if (update.getName().getOrigin() != null + && (!update.getName().getOrigin().toLowerCase().equals("user") + || !update.getName().getContext().containsKey("namespace"))) { + // Skip non-user metrics, which should have both a "user" origin and a namespace. + continue; + } + + MetricKey updateKey = getMetricHashKey(update); + if (updateKey == null || !MetricFiltering.matches(filter, updateKey)) { + // Skip unmatched metrics early. + continue; + } + + metricHashKeys.add(updateKey); + if (isMetricTentative(update)) { + MetricUpdate previousUpdate = tentativeByName.put(updateKey, update); + if (previousUpdate != null) { + LOG.warn("Metric {} already had a tentative value of {}", updateKey, previousUpdate); + } + } else { + MetricUpdate previousUpdate = committedByName.put(updateKey, update); + if (previousUpdate != null) { + LOG.warn("Metric {} already had a committed value of {}", updateKey, previousUpdate); + } + } + } + } + + public MetricQueryResults build() { + buildMetricsIndex(); + + DataflowMetricResultExtractor extractor = new DataflowMetricResultExtractor( + dataflowPipelineJob.getDataflowOptions().isStreaming()); + for (MetricKey metricKey : metricHashKeys) { + String metricName = metricKey.metricName().name(); + if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]") + || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) { + // Skip distribution metrics, as these are not yet properly supported. + // TODO: remove this when distributions stop being broken up for the UI. + continue; + } + + extractor.addMetricResult(metricKey, + committedByName.get(metricKey), + tentativeByName.get(metricKey)); + } + return DataflowMetricQueryResults.create( + extractor.getCounterResults(), + extractor.getDistributionResults(), + extractor.getGaugeResults()); + } + } + @AutoValue abstract static class DataflowMetricQueryResults implements MetricQueryResults { public static MetricQueryResults create( @@ -231,7 +349,9 @@ class DataflowMetrics extends MetricResults { // and the generated constructor is usable and consistent public abstract MetricName name(); public abstract String step(); + @Nullable public abstract T committed(); + @Nullable public abstract T attempted(); public static MetricResult create(MetricName name, String scope, http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 2d23983..e30d426 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -164,6 +164,10 @@ public class DataflowPipelineJob implements PipelineResult { return dataflowOptions.getProject(); } + public DataflowPipelineOptions getDataflowOptions() { + return dataflowOptions; + } + /** * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable. * http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index c3c741c..05fe687 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow; import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; @@ -28,6 +29,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.client.util.ArrayMap; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.JobMetrics; @@ -38,9 +40,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.math.BigDecimal; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; +import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -95,6 +99,9 @@ public class DataflowMetricsTest { modelJob.setCurrentState(State.RUNNING.toString()); DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); when(job.getState()).thenReturn(State.RUNNING); job.jobId = JOB_ID; @@ -115,6 +122,9 @@ public class DataflowMetricsTest { modelJob.setCurrentState(State.RUNNING.toString()); DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); when(job.getState()).thenReturn(State.DONE); job.jobId = JOB_ID; @@ -131,11 +141,8 @@ public class DataflowMetricsTest { verify(dataflowClient, times(1)).getJobMetrics(JOB_ID); } - private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step, - long scalar, boolean tentative) { - MetricUpdate update = new MetricUpdate(); - update.setScalar(new BigDecimal(scalar)); - + private MetricUpdate setStructuredName(MetricUpdate update, String name, String namespace, + String step, boolean tentative) { MetricStructuredName structuredName = new MetricStructuredName(); structuredName.setName(name); structuredName.setOrigin("user"); @@ -150,10 +157,34 @@ public class DataflowMetricsTest { return update; } + private MetricUpdate makeDistributionMetricUpdate(String name, String namespace, String step, + Long sum, Long count, Long min, Long max, boolean tentative) { + MetricUpdate update = new MetricUpdate(); + ArrayMap distribution = ArrayMap.create(); + distribution.add("count", new BigDecimal(count)); + distribution.add("mean", new BigDecimal(sum / count)); + distribution.add("sum", new BigDecimal(sum)); + distribution.add("min", new BigDecimal(min)); + distribution.add("max", new BigDecimal(max)); + update.setDistribution(distribution); + return setStructuredName(update, name, namespace, step, tentative); + } + + private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step, + long scalar, boolean tentative) { + MetricUpdate update = new MetricUpdate(); + update.setScalar(new BigDecimal(scalar)); + return setStructuredName(update, name, namespace, step, tentative); + + } + @Test public void testSingleCounterUpdates() throws IOException { JobMetrics jobMetrics = new JobMetrics(); DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); when(job.getState()).thenReturn(State.RUNNING); job.jobId = JOB_ID; @@ -179,7 +210,7 @@ public class DataflowMetricsTest { DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); MetricQueryResults result = dataflowMetrics.queryMetrics(null); assertThat(result.counters(), containsInAnyOrder( - attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L))); + attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null))); assertThat(result.counters(), containsInAnyOrder( committedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L))); } @@ -190,6 +221,9 @@ public class DataflowMetricsTest { DataflowClient dataflowClient = mock(DataflowClient.class); when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); when(job.getState()).thenReturn(State.RUNNING); job.jobId = JOB_ID; @@ -202,24 +236,97 @@ public class DataflowMetricsTest { // the job metrics results. jobMetrics.setMetrics(ImmutableList.of( makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false), - makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true), + makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, true), makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, false), makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, true))); DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); MetricQueryResults result = dataflowMetrics.queryMetrics(null); assertThat(result.counters(), containsInAnyOrder( - attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L))); + attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null))); assertThat(result.counters(), containsInAnyOrder( committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L))); } @Test + public void testDistributionUpdates() throws IOException { + JobMetrics jobMetrics = new JobMetrics(); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); + when(job.getState()).thenReturn(State.RUNNING); + job.jobId = JOB_ID; + + AppliedPTransform myStep2 = mock(AppliedPTransform.class); + when(myStep2.getFullName()).thenReturn("myStepName"); + job.transformStepNames = HashBiMap.create(); + job.transformStepNames.put(myStep2, "s2"); + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + jobMetrics.setMetrics(ImmutableList.of( + makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2", + 18L, 2L, 2L, 16L, false), + makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2", + 18L, 2L, 2L, 16L, true))); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.queryMetrics(null); + assertThat(result.distributions(), contains( + attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName", + (DistributionResult) null))); + assertThat(result.distributions(), contains( + committedMetricsResult("distributionNamespace", "distributionName", "myStepName", + DistributionResult.create(18, 2, 2, 16)))); + } + + @Test + public void testDistributionUpdatesStreaming() throws IOException { + JobMetrics jobMetrics = new JobMetrics(); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(true); + when(job.getDataflowOptions()).thenReturn(options); + when(job.getState()).thenReturn(State.RUNNING); + job.jobId = JOB_ID; + + AppliedPTransform myStep2 = mock(AppliedPTransform.class); + when(myStep2.getFullName()).thenReturn("myStepName"); + job.transformStepNames = HashBiMap.create(); + job.transformStepNames.put(myStep2, "s2"); + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + jobMetrics.setMetrics(ImmutableList.of( + makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2", + 18L, 2L, 2L, 16L, false), + makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2", + 18L, 2L, 2L, 16L, true))); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.queryMetrics(null); + assertThat(result.distributions(), contains( + committedMetricsResult("distributionNamespace", "distributionName", "myStepName", + (DistributionResult) null))); + assertThat(result.distributions(), contains( + attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName", + DistributionResult.create(18, 2, 2, 16)))); + } + + @Test public void testMultipleCounterUpdates() throws IOException { JobMetrics jobMetrics = new JobMetrics(); DataflowClient dataflowClient = mock(DataflowClient.class); when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); when(job.getState()).thenReturn(State.RUNNING); job.jobId = JOB_ID; @@ -251,12 +358,57 @@ public class DataflowMetricsTest { DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); MetricQueryResults result = dataflowMetrics.queryMetrics(null); assertThat(result.counters(), containsInAnyOrder( - attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L), - attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L), - attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1233L))); + attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null), + attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null), + attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null))); assertThat(result.counters(), containsInAnyOrder( committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L), committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L), committedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L))); } + + @Test + public void testMultipleCounterUpdatesStreaming() throws IOException { + JobMetrics jobMetrics = new JobMetrics(); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(true); + when(job.getDataflowOptions()).thenReturn(options); + when(job.getState()).thenReturn(State.RUNNING); + job.jobId = JOB_ID; + + AppliedPTransform myStep2 = mock(AppliedPTransform.class); + when(myStep2.getFullName()).thenReturn("myStepName"); + job.transformStepNames = HashBiMap.create(); + job.transformStepNames.put(myStep2, "s2"); + AppliedPTransform myStep3 = mock(AppliedPTransform.class); + when(myStep3.getFullName()).thenReturn("myStepName3"); + job.transformStepNames.put(myStep3, "s3"); + AppliedPTransform myStep4 = mock(AppliedPTransform.class); + when(myStep4.getFullName()).thenReturn("myStepName4"); + job.transformStepNames.put(myStep4, "s4"); + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + jobMetrics.setMetrics(ImmutableList.of( + makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false), + makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true), + makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, false), + makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, true), + makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1200L, false), + makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true))); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.queryMetrics(null); + assertThat(result.counters(), containsInAnyOrder( + committedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null), + committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null), + committedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null))); + assertThat(result.counters(), containsInAnyOrder( + attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L), + attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L), + attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L))); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java index 5031952..030a759 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java @@ -96,7 +96,7 @@ public class MetricResultsMatchers { if (result1 instanceof GaugeResult) { return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value()); } else { - return result1.equals(result2); + return Objects.equals(result1, result2); } }