Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4DE3E185ED for ; Fri, 11 Dec 2015 15:26:33 +0000 (UTC) Received: (qmail 52548 invoked by uid 500); 11 Dec 2015 15:26:33 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 52509 invoked by uid 500); 11 Dec 2015 15:26:33 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 52440 invoked by uid 99); 11 Dec 2015 15:26:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2015 15:26:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E4535E0513; Fri, 11 Dec 2015 15:26:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Fri, 11 Dec 2015 15:26:33 -0000 Message-Id: <53bbbcbb70da422b9beda8d67354cbe8@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-3121] Emit Final Watermark in Kafka Source [FLINK-3121] Emit Final Watermark in Kafka Source Kafka sources that don't read from any partition never emit a watermark, thereby blocking the progress of event-time in downstream operations. This changes the Kafka Source to emit a Long.MAX_VALUE watermark if it knows that it will never receive data. This also changes the Timestamp Extraction operator to reacto to a Long.MAX_VALUE watermark by itself emitting a Long.MAX_VALUE watermark. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bd5714d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bd5714d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bd5714d Branch: refs/heads/master Commit: 6bd5714d2a045e581b1a761830d010598f803de7 Parents: 4b64887 Author: Aljoscha Krettek Authored: Wed Dec 9 12:13:22 2015 +0100 Committer: Aljoscha Krettek Committed: Fri Dec 11 10:45:34 2015 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer.java | 8 ++- .../streaming/api/operators/StreamSource.java | 16 +++-- .../operators/ExtractTimestampsOperator.java | 15 ++--- .../streaming/timestamp/TimestampITCase.java | 62 ++++++++++++++++++++ 4 files changed, 82 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index c4fd654..b139e95 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internals.Fetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher; @@ -434,7 +435,12 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction } } else { - // this source never completes + // this source never completes, so emit a Long.MAX_VALUE watermark + // to not block watermark forwarding + if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) { + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + final Object waitLock = new Object(); while (running) { // wait until we are canceled http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 91c846f..996e32c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -60,6 +60,12 @@ public class StreamSource extends AbstractUdfStreamOperator extends AbstractUdfStreamOperator } @Override - public void close() throws Exception { - super.close(); - - // emit a final +Inf watermark, just like the sources - output.emitWatermark(new Watermark(Long.MAX_VALUE)); - } - - @Override public void processElement(StreamRecord element) throws Exception { long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp()); output.collect(element.replace(element.getValue(), newTimestamp)); @@ -90,6 +82,11 @@ public class ExtractTimestampsOperator @Override public void processWatermark(Watermark mark) throws Exception { - // ignore them, since we are basically a watermark source + // if we receive a Long.MAX_VALUE watermark we forward it since it is used + // to signal the end of input and to not block watermark progress downstream + if (mark.getTimestamp() == Long.MAX_VALUE && mark.getTimestamp() > currentWatermark) { + currentWatermark = Long.MAX_VALUE; + output.emitWatermark(mark); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index 8e7ada4..6c3ef40 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -448,6 +448,68 @@ public class TimestampITCase { } /** + * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks. + */ + @Test + public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception { + final int NUM_ELEMENTS = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); + env.setParallelism(2); + env.getConfig().disableSysoutLogging(); + env.getConfig().enableTimestamps(); + env.getConfig().setAutoWatermarkInterval(1); + + + DataStream source1 = env.addSource(new EventTimeSourceFunction() { + @Override + public void run(SourceContext ctx) throws Exception { + int index = 0; + while (index < NUM_ELEMENTS) { + ctx.collectWithTimestamp(index, index); + ctx.collectWithTimestamp(index - 1, index - 1); + index++; + ctx.emitWatermark(new Watermark(index-2)); + } + + // emit the final Long.MAX_VALUE watermark, do it twice and verify that + // we only see one in the result + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + + @Override + public void cancel() { + + } + }); + + source1.assignTimestamps(new TimestampExtractor() { + @Override + public long extractTimestamp(Integer element, long currentTimestamp) { + return element; + } + + @Override + public long extractWatermark(Integer element, long currentTimestamp) { + return Long.MIN_VALUE; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + }) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); + + + env.execute(); + + Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); + Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE); + } + + /** * This tests whether the program throws an exception when an event-time source tries * to emit without timestamp. */