Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9ACBE200C92 for ; Mon, 12 Jun 2017 18:55:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 997DD160BD9; Mon, 12 Jun 2017 16:55:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CC39F160BED for ; Mon, 12 Jun 2017 18:55:18 +0200 (CEST) Received: (qmail 7685 invoked by uid 500); 12 Jun 2017 16:55:18 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 7554 invoked by uid 99); 12 Jun 2017 16:55:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jun 2017 16:55:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0F98BE9671; Mon, 12 Jun 2017 16:55:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: kenn@apache.org To: commits@beam.apache.org Date: Mon, 12 Jun 2017 16:55:21 -0000 Message-Id: <3e79878a753342949a2fe7f2033164a3@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/50] [abbrv] beam git commit: Shutdown Flink Streaming Pipeline when reaching +Inf watermark archived-at: Mon, 12 Jun 2017 16:55:19 -0000 Shutdown Flink Streaming Pipeline when reaching +Inf watermark Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c83ffe0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c83ffe0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c83ffe0 Branch: refs/heads/gearpump-runner Commit: 9c83ffe0cdc6636d2187bf9439a73a3b45756d50 Parents: caecac3 Author: Aljoscha Krettek Authored: Mon Jun 5 12:19:00 2017 +0200 Committer: Ismaël Mejía Committed: Wed Jun 7 23:13:52 2017 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/io/UnboundedSourceWrapper.java | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9c83ffe0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 6055a43..e75072a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -436,6 +437,10 @@ public class UnboundedSourceWrapper< } } context.emitWatermark(new Watermark(watermarkMillis)); + + if (watermarkMillis >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + this.isRunning = false; + } } setNextWatermarkTimer(this.runtimeContext); }