Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BC969200BE4 for ; Wed, 21 Dec 2016 21:23:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BB60B160B46; Wed, 21 Dec 2016 20:23:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ECD52160B39 for ; Wed, 21 Dec 2016 21:23:14 +0100 (CET) Received: (qmail 30508 invoked by uid 500); 21 Dec 2016 20:23:14 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 30364 invoked by uid 99); 21 Dec 2016 20:23:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Dec 2016 20:23:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A6B2518C6BB for ; Wed, 21 Dec 2016 20:23:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Wgv0mqpIDjbu for ; Wed, 21 Dec 2016 20:23:12 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 719735FDCF for ; Wed, 21 Dec 2016 20:23:10 +0000 (UTC) Received: (qmail 28585 invoked by uid 99); 21 Dec 2016 20:23:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Dec 2016 20:23:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 849BBF1832; Wed, 21 Dec 2016 20:23:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Wed, 21 Dec 2016 20:23:28 -0000 Message-Id: <76f7e84648b74a6e99e53241389d4107@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/50] incubator-beam git commit: Change counter name in TestDataflowRunner archived-at: Wed, 21 Dec 2016 20:23:15 -0000 Change counter name in TestDataflowRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b055d2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b055d2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b055d2d Branch: refs/heads/gearpump-runner Commit: 6b055d2debe879816808b4c1ee847e34cc1df5c0 Parents: 1ee191f Author: Joshua Litt Authored: Sat Dec 17 11:12:12 2016 -0800 Committer: Joshua Litt Committed: Sat Dec 17 11:12:12 2016 -0800 ---------------------------------------------------------------------- .../dataflow/testing/TestDataflowRunner.java | 29 ++++++++++++++++---- .../testing/TestDataflowRunnerTest.java | 16 ++++++++++- 2 files changed, 39 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b055d2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 4b0fcf2..0564448 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -61,7 +61,12 @@ import org.slf4j.LoggerFactory; */ public class TestDataflowRunner extends PipelineRunner { private static final String TENTATIVE_COUNTER = "tentative"; - private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; + // See https://issues.apache.org/jira/browse/BEAM-1170 + // we need to either fix the API or pipe the DRAINED signal through + @VisibleForTesting + static final String LEGACY_WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; + @VisibleForTesting + static final String WATERMARK_METRIC_SUFFIX = "DataWatermark"; private static final long MAX_WATERMARK_VALUE = -2L; private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); @@ -248,6 +253,23 @@ public class TestDataflowRunner extends PipelineRunner { } /** + * Checks wether a metric is a streaming watermark. + * + * @return true if the metric is a watermark. + */ + boolean isWatermark(MetricUpdate metric) { + if (metric.getName() == null || metric.getName().getName() == null) { + return false; // no name -> shouldn't happen, not the watermark + } + if (metric.getScalar() == null) { + return false; // no scalar value -> not the watermark + } + String name = metric.getName().getName(); + return name.endsWith(LEGACY_WATERMARK_METRIC_SUFFIX) + || name.endsWith(WATERMARK_METRIC_SUFFIX); + } + + /** * Check watermarks of the streaming job. At least one watermark metric must exist. * * @return true if all watermarks are at max, false otherwise. @@ -256,10 +278,7 @@ public class TestDataflowRunner extends PipelineRunner { boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) { boolean hasMaxWatermark = false; for (MetricUpdate metric : metrics.getMetrics()) { - if (metric.getName() == null - || metric.getName().getName() == null - || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX) - || metric.getScalar() == null) { + if (!isWatermark(metric)) { continue; } BigDecimal watermark = (BigDecimal) metric.getScalar(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b055d2d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 366c6a1..da5630b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.testing; +import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.LEGACY_WATERMARK_METRIC_SUFFIX; +import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.WATERMARK_METRIC_SUFFIX; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -95,7 +97,6 @@ public class TestDataflowRunnerTest { @Mock private MockLowLevelHttpRequest request; @Mock private GcsUtil mockGcsUtil; - private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2); private TestDataflowPipelineOptions options; @@ -411,6 +412,19 @@ public class TestDataflowRunnerTest { } @Test + public void testCheckMaxWatermarkWithLegacyWatermarkAtMax() throws IOException { + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( + ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); + doReturn(State.RUNNING).when(job).getState(); + assertTrue(runner.atMaxWatermark(job, metrics)); + } + + @Test public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options);