Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 32371200BF1 for ; Tue, 20 Dec 2016 06:05:42 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2F9BF160B21; Tue, 20 Dec 2016 05:05:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7F579160B3B for ; Tue, 20 Dec 2016 06:05:40 +0100 (CET) Received: (qmail 99876 invoked by uid 500); 20 Dec 2016 05:05:39 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 99618 invoked by uid 99); 20 Dec 2016 05:05:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Dec 2016 05:05:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E0F51DFCF7; Tue, 20 Dec 2016 05:05:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Tue, 20 Dec 2016 05:05:42 -0000 Message-Id: In-Reply-To: <69bc1fd66f6c4b5c90fb326315c95bfb@git.apache.org> References: <69bc1fd66f6c4b5c90fb326315c95bfb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/7] flink git commit: [FLINK-4391] Add timeout parameter for asynchronous I/O archived-at: Tue, 20 Dec 2016 05:05:42 -0000 [FLINK-4391] Add timeout parameter for asynchronous I/O The timeout defines how long an asynchronous I/O operation can take. If the operation takes longer than the timeout, then it is failed with an TimeoutException. Annotate classes with internal Annotation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c5a8711 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c5a8711 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c5a8711 Branch: refs/heads/master Commit: 6c5a8711d80dfcea20967aea009bac51122d5094 Parents: ad603d5 Author: Till Rohrmann Authored: Tue Dec 20 03:42:25 2016 +0100 Committer: Till Rohrmann Committed: Tue Dec 20 05:04:51 2016 +0100 ---------------------------------------------------------------------- .../examples/async/AsyncIOExample.java | 19 ++- .../api/datastream/AsyncDataStream.java | 67 +++++++--- .../api/operators/async/AsyncWaitOperator.java | 5 +- .../streaming/api/operators/async/Emitter.java | 2 + .../api/operators/async/OperatorActions.java | 2 + .../async/queue/AsyncCollectionResult.java | 3 + .../api/operators/async/queue/AsyncResult.java | 2 + .../async/queue/AsyncWatermarkResult.java | 2 + .../async/queue/OrderedStreamElementQueue.java | 2 + .../async/queue/StreamElementQueue.java | 2 + .../async/queue/StreamElementQueueEntry.java | 2 + .../async/queue/StreamRecordQueueEntry.java | 2 + .../queue/UnorderedStreamElementQueue.java | 2 + .../async/queue/WatermarkQueueEntry.java | 2 + .../operators/async/AsyncWaitOperatorTest.java | 131 +++++++++++++++++-- .../util/AbstractStreamOperatorTestHarness.java | 42 ++++-- .../util/OneInputStreamOperatorTestHarness.java | 21 +++ .../streaming/api/StreamingOperatorsITCase.java | 17 ++- 18 files changed, 277 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java index 6dde537..2b05983 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java @@ -206,7 +206,8 @@ public class AsyncIOExample { "[--maxCount ] " + "[--sleepFactor ] [--failRatio ] " + "[--waitMode ] [--waitOperatorParallelism ] " + - "[--eventType ] [--shutdownWaitTS ]"); + "[--eventType ] [--shutdownWaitTS ]" + + "[--timeout ]"); } public static void main(String[] args) throws Exception { @@ -226,6 +227,7 @@ public class AsyncIOExample { final int taskNum; final String timeType; final long shutdownWaitTS; + final long timeout; try { // check the configuration for the job @@ -238,6 +240,7 @@ public class AsyncIOExample { taskNum = params.getInt("waitOperatorParallelism", 1); timeType = params.get("eventType", "EventTime"); shutdownWaitTS = params.getLong("shutdownWaitTS", 20000); + timeout = params.getLong("timeout", 10000L); } catch (Exception e) { printUsage(); @@ -292,10 +295,20 @@ public class AsyncIOExample { // add async operator to streaming job DataStream result; if (ORDERED.equals(mode)) { - result = AsyncDataStream.orderedWait(inputStream, function, 20).setParallelism(taskNum); + result = AsyncDataStream.orderedWait( + inputStream, + function, + timeout, + TimeUnit.MILLISECONDS, + 20).setParallelism(taskNum); } else { - result = AsyncDataStream.unorderedWait(inputStream, function, 20).setParallelism(taskNum); + result = AsyncDataStream.unorderedWait( + inputStream, + function, + timeout, + TimeUnit.MILLISECONDS, + 20).setParallelism(taskNum); } // add a reduce to get the sum of each keys. http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java index 4fefde0..8132d28 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java @@ -24,6 +24,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import java.util.concurrent.TimeUnit; + /** * A helper class to apply {@link AsyncFunction} to a data stream. *

@@ -31,7 +33,7 @@ import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; * DataStream input = ... * AsyncFunction> asyncFunc = ... * - * AsyncDataStream.orderedWait(input, asyncFunc, 100); + * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100); * } * */ @@ -40,13 +42,14 @@ import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; public class AsyncDataStream { public enum OutputMode { ORDERED, UNORDERED } - private static final int DEFAULT_BUFFER_SIZE = 100; + private static final int DEFAULT_QUEUE_CAPACITY = 100; /** * Add an AsyncWaitOperator. * * @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added. * @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}. + * @param timeout for the asynchronous operation to complete * @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside. * @param mode Processing mode for {@link AsyncWaitOperator}. * @param Input type. @@ -56,6 +59,7 @@ public class AsyncDataStream { private static SingleOutputStreamOperator addOperator( DataStream in, AsyncFunction func, + long timeout, int bufSize, OutputMode mode) { @@ -64,8 +68,11 @@ public class AsyncDataStream { true, in.getType(), Utils.getCallLocationName(), true); // create transform - AsyncWaitOperator operator = - new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func), bufSize, mode); + AsyncWaitOperator operator = new AsyncWaitOperator<>( + in.getExecutionEnvironment().clean(func), + timeout, + bufSize, + mode); return in.transform("async wait operator", outTypeInfo, operator); } @@ -75,7 +82,9 @@ public class AsyncDataStream { * * @param in Input {@link DataStream} * @param func {@link AsyncFunction} - * @param bufSize The max number of async i/o operation that can be triggered + * @param timeout for the asynchronous operation to complete + * @param timeUnit of the given timeout + * @param capacity The max number of async i/o operation that can be triggered * @param Type of input record * @param Type of output record * @return A new {@link SingleOutputStreamOperator}. @@ -83,30 +92,44 @@ public class AsyncDataStream { public static SingleOutputStreamOperator unorderedWait( DataStream in, AsyncFunction func, - int bufSize) { - return addOperator(in, func, bufSize, OutputMode.UNORDERED); + long timeout, + TimeUnit timeUnit, + int capacity) { + return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED); } /** * Add an AsyncWaitOperator. The order of output stream records may be reordered. * @param in Input {@link DataStream} * @param func {@link AsyncFunction} + * @param timeout for the asynchronous operation to complete + * @param timeUnit of the given timeout * @param Type of input record * @param Type of output record * @return A new {@link SingleOutputStreamOperator}. */ public static SingleOutputStreamOperator unorderedWait( DataStream in, - AsyncFunction func) { - return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.UNORDERED); + AsyncFunction func, + long timeout, + TimeUnit timeUnit) { + return addOperator( + in, + func, + timeUnit.toMillis(timeout), + DEFAULT_QUEUE_CAPACITY, + OutputMode.UNORDERED); } /** - * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones. + * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as + * input ones. * * @param in Input {@link DataStream} * @param func {@link AsyncFunction} - * @param bufSize The max number of async i/o operation that can be triggered + * @param timeout for the asynchronous operation to complete + * @param timeUnit of the given timeout + * @param capacity The max number of async i/o operation that can be triggered * @param Type of input record * @param Type of output record * @return A new {@link SingleOutputStreamOperator}. @@ -114,22 +137,34 @@ public class AsyncDataStream { public static SingleOutputStreamOperator orderedWait( DataStream in, AsyncFunction func, - int bufSize) { - return addOperator(in, func, bufSize, OutputMode.ORDERED); + long timeout, + TimeUnit timeUnit, + int capacity) { + return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED); } /** - * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones. + * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as + * input ones. * * @param in Input {@link DataStream} * @param func {@link AsyncFunction} + * @param timeout for the asynchronous operation to complete + * @param timeUnit of the given timeout * @param Type of input record * @param Type of output record * @return A new {@link SingleOutputStreamOperator}. */ public static SingleOutputStreamOperator orderedWait( DataStream in, - AsyncFunction func) { - return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.ORDERED); + AsyncFunction func, + long timeout, + TimeUnit timeUnit) { + return addOperator( + in, + func, + timeUnit.toMillis(timeout), + DEFAULT_QUEUE_CAPACITY, + OutputMode.ORDERED); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index 88fc833..754b754 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -114,6 +114,7 @@ public class AsyncWaitOperator public AsyncWaitOperator( AsyncFunction asyncFunction, + long timeout, int capacity, AsyncDataStream.OutputMode outputMode) { super(asyncFunction); @@ -124,7 +125,7 @@ public class AsyncWaitOperator this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode"); - this.timeout = -1L; + this.timeout = timeout; } @Override @@ -200,7 +201,7 @@ public class AsyncWaitOperator if (timeout > 0L) { // register a timeout for this AsyncStreamRecordBufferEntry - long timeoutTimestamp = timeout + System.currentTimeMillis(); + long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer( timeoutTimestamp, http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java index 4b22aaa..c122d23 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.async; +import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.queue.AsyncCollectionResult; @@ -37,6 +38,7 @@ import java.util.Collection; * * @param Type of the output elements */ +@Internal public class Emitter implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(Emitter.class); http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java index 5a2e43c..916b412 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java @@ -18,11 +18,13 @@ package org.apache.flink.streaming.api.operators.async; +import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.operators.StreamOperator; /** * Interface for {@link StreamOperator} actions. */ +@Internal public interface OperatorActions { /** http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java index 8088bf0..6226ae6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; + import java.util.Collection; /** @@ -25,6 +27,7 @@ import java.util.Collection; * * @param Type of the collection elements. */ +@Internal public interface AsyncCollectionResult extends AsyncResult { boolean hasTimestamp(); http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java index 1a99928..751de76 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.watermark.Watermark; @@ -26,6 +27,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; * either be a {@link Watermark} or a collection of new output elements produced by the * {@link AsyncFunction}. */ +@Internal public interface AsyncResult { /** http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java index c19b520..68bde3a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java @@ -18,11 +18,13 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.watermark.Watermark; /** * {@link AsyncResult} subclass for asynchronous result {@link Watermark}. */ +@Internal public interface AsyncWatermarkResult extends AsyncResult { /** * Get the resulting watermark. http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java index 2bbcb6c..414b3c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.streaming.api.operators.async.OperatorActions; import org.apache.flink.util.Preconditions; @@ -37,6 +38,7 @@ import java.util.concurrent.locks.ReentrantLock; * to the queue. Thus, even if the completion order can be arbitrary, the output order strictly * follows the insertion order (element cannot overtake each other). */ +@Internal public class OrderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class); http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java index 1a2c4a8..e02b8b0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; import java.util.Collection; @@ -25,6 +26,7 @@ import java.util.Collection; /** * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. */ +@Internal public interface StreamElementQueue { /** http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java index 06ebf3c..7db9d4f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; @@ -32,6 +33,7 @@ import java.util.concurrent.Executor; * * @param Type of the result */ +@Internal public abstract class StreamElementQueueEntry implements AsyncResult { /** Stream element */ http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java index f0e707e..c654702 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; @@ -34,6 +35,7 @@ import java.util.Collection; * * @param Type of the asynchronous collection result */ +@Internal public class StreamRecordQueueEntry extends StreamElementQueueEntry> implements AsyncCollectionResult, AsyncCollector { http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java index 603d8cc..f244008 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.streaming.api.operators.async.OperatorActions; import org.apache.flink.util.Preconditions; @@ -41,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock; * and no watermark can overtake a stream record. However, stream records falling in the same * segment between two watermarks can overtake each other (their emission order is not guaranteed). */ +@Internal public class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java index 6fe4f44..1f48303 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.async.queue; +import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.streaming.api.watermark.Watermark; @@ -25,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; /** * {@link StreamElementQueueEntry} implementation for the {@link Watermark}. */ +@Internal public class WatermarkQueueEntry extends StreamElementQueueEntry implements AsyncWatermarkResult { private final Future future; http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 10ee14f..d9e885e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators.async; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -27,11 +28,15 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -51,6 +56,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.util.ArrayDeque; import java.util.Collections; @@ -59,11 +65,16 @@ import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link AsyncWaitOperator}. These test that: @@ -77,10 +88,12 @@ import static org.junit.Assert.assertEquals; */ public class AsyncWaitOperatorTest extends TestLogger { + private static final long TIMEOUT = 1000L; + private static class MyAsyncFunction extends RichAsyncFunction { private static final long serialVersionUID = 8522411971886428444L; - private static final long TIMEOUT = 5000L; + private static final long TERMINATION_TIMEOUT = 5000L; private static final int THREAD_POOL_SIZE = 10; static ExecutorService executorService; @@ -114,7 +127,7 @@ public class AsyncWaitOperatorTest extends TestLogger { executorService.shutdown(); try { - if (!executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)) { + if (!executorService.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.MILLISECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException interrupted) { @@ -219,7 +232,11 @@ public class AsyncWaitOperatorTest extends TestLogger { } private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception { - final AsyncWaitOperator operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 2, mode); + final AsyncWaitOperator operator = new AsyncWaitOperator<>( + new MyAsyncFunction(), + TIMEOUT, + 2, + mode); final OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE); @@ -280,7 +297,8 @@ public class AsyncWaitOperatorTest extends TestLogger { } private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exception { - final AsyncWaitOperator operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 6, mode); + final AsyncWaitOperator operator = new AsyncWaitOperator<>( + new MyAsyncFunction(), TIMEOUT, 6, mode); final OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE); @@ -380,10 +398,20 @@ public class AsyncWaitOperatorTest extends TestLogger { DataStream input = chainEnv.fromElements(1, 2, 3); if (withLazyFunction) { - input = AsyncDataStream.orderedWait(input, new LazyAsyncFunction(), 6); + input = AsyncDataStream.orderedWait( + input, + new LazyAsyncFunction(), + TIMEOUT, + TimeUnit.MILLISECONDS, + 6); } else { - input = AsyncDataStream.orderedWait(input, new MyAsyncFunction(), 6); + input = AsyncDataStream.orderedWait( + input, + new MyAsyncFunction(), + TIMEOUT, + TimeUnit.MILLISECONDS, + 6); } // the map function is designed to chain after async function. we place an Integer object in it and @@ -407,7 +435,12 @@ public class AsyncWaitOperatorTest extends TestLogger { } }); - input = AsyncDataStream.unorderedWait(input, new MyAsyncFunction(), 3); + input = AsyncDataStream.unorderedWait( + input, + new MyAsyncFunction(), + TIMEOUT, + TimeUnit.MILLISECONDS, + 3); input.map(new MapFunction() { private static final long serialVersionUID = 5162085254238405527L; @@ -432,8 +465,11 @@ public class AsyncWaitOperatorTest extends TestLogger { final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); - AsyncWaitOperator operator = - new AsyncWaitOperator<>(new LazyAsyncFunction(), 3, AsyncDataStream.OutputMode.ORDERED); + AsyncWaitOperator operator = new AsyncWaitOperator<>( + new LazyAsyncFunction(), + TIMEOUT, + 3, + AsyncDataStream.OutputMode.ORDERED); final StreamConfig streamConfig = testHarness.getStreamConfig(); streamConfig.setStreamOperator(operator); @@ -481,8 +517,11 @@ public class AsyncWaitOperatorTest extends TestLogger { final OneInputStreamTaskTestHarness restoredTaskHarness = new OneInputStreamTaskTestHarness<>(restoredTask, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); - AsyncWaitOperator restoredOperator = - new AsyncWaitOperator<>(new MyAsyncFunction(), 6, AsyncDataStream.OutputMode.ORDERED); + AsyncWaitOperator restoredOperator = new AsyncWaitOperator<>( + new MyAsyncFunction(), + TIMEOUT, + 6, + AsyncDataStream.OutputMode.ORDERED); restoredTaskHarness.getStreamConfig().setStreamOperator(restoredOperator); @@ -561,4 +600,74 @@ public class AsyncWaitOperatorTest extends TestLogger { return checkpointStateHandles; } } + + @Test + public void testAsyncTimeout() throws Exception { + final long timeout = 10L; + + final AsyncWaitOperator operator = new AsyncWaitOperator<>( + new LazyAsyncFunction(), + timeout, + 2, + AsyncDataStream.OutputMode.ORDERED); + + final Environment mockEnvironment = mock(Environment.class); + + final Configuration taskConfiguration = new Configuration(); + final ExecutionConfig executionConfig = new ExecutionConfig(); + final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup(); + final Configuration taskManagerConfiguration = new Configuration(); + final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TaskManagerRuntimeInfo("localhost", taskManagerConfiguration, "/tmp"); + final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1); + + when(mockEnvironment.getTaskConfiguration()).thenReturn(taskConfiguration); + when(mockEnvironment.getExecutionConfig()).thenReturn(executionConfig); + when(mockEnvironment.getMetricGroup()).thenReturn(metricGroup); + when(mockEnvironment.getTaskManagerInfo()).thenReturn(taskManagerRuntimeInfo); + when(mockEnvironment.getTaskInfo()).thenReturn(taskInfo); + when(mockEnvironment.getUserClassLoader()).thenReturn(AsyncWaitOperatorTest.class.getClassLoader()); + + final OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE, mockEnvironment); + + final long initialTime = 0L; + final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.setProcessingTime(initialTime); + + synchronized (testHarness.getCheckpointLock()) { + testHarness.processElement(new StreamRecord<>(1, initialTime)); + testHarness.setProcessingTime(initialTime + 5L); + testHarness.processElement(new StreamRecord<>(2, initialTime + 5L)); + } + + // trigger the timeout of the first stream record + testHarness.setProcessingTime(initialTime + timeout + 1L); + + // allow the second async stream record to be processed + LazyAsyncFunction.countDown(); + + // wait until all async collectors in the buffer have been emitted out. + synchronized (testHarness.getCheckpointLock()) { + testHarness.close(); + } + + expectedOutput.add(new StreamRecord<>(2, initialTime + 5L)); + + TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput()); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Throwable.class); + + verify(mockEnvironment).failExternally(argumentCaptor.capture()); + + Throwable failureCause = argumentCaptor.getValue(); + + Assert.assertNotNull(failureCause.getCause()); + Assert.assertTrue(failureCause.getCause() instanceof ExecutionException); + + Assert.assertNotNull(failureCause.getCause().getCause()); + Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 830cd6f..b623fa1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -54,6 +54,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.util.FutureUtil; +import org.apache.flink.util.Preconditions; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -107,29 +108,44 @@ public class AbstractStreamOperatorTestHarness { private volatile boolean wasFailedExternally = false; public AbstractStreamOperatorTestHarness( + StreamOperator operator, + int maxParallelism, + int numSubtasks, + int subtaskIndex) throws Exception { + + this( + operator, + maxParallelism, + numSubtasks, + subtaskIndex, + new MockEnvironment( + "MockTask", + 3 * 1024 * 1024, + new MockInputSplitProvider(), + 1024, + new Configuration(), + new ExecutionConfig(), + maxParallelism, + numSubtasks, + subtaskIndex)); + } + + public AbstractStreamOperatorTestHarness( StreamOperator operator, int maxParallelism, int numSubtasks, - int subtaskIndex) throws Exception { + int subtaskIndex, + final Environment environment) throws Exception { this.operator = operator; this.outputList = new ConcurrentLinkedQueue<>(); - Configuration underlyingConfig = new Configuration(); + Configuration underlyingConfig = environment.getTaskConfiguration(); this.config = new StreamConfig(underlyingConfig); this.config.setCheckpointingEnabled(true); - this.executionConfig = new ExecutionConfig(); + this.executionConfig = environment.getExecutionConfig(); this.closableRegistry = new CloseableRegistry(); this.checkpointLock = new Object(); - environment = new MockEnvironment( - "MockTask", - 3 * 1024 * 1024, - new MockInputSplitProvider(), - 1024, - underlyingConfig, - executionConfig, - maxParallelism, - numSubtasks, - subtaskIndex); + this.environment = Preconditions.checkNotNull(environment); mockTask = mock(StreamTask.class); processingTimeService = new TestProcessingTimeService(); http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 86fbaa0..9b02378 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -45,6 +46,15 @@ public class OneInputStreamOperatorTestHarness config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn)); } + public OneInputStreamOperatorTestHarness( + OneInputStreamOperator operator, + TypeSerializer typeSerializerIn, + Environment environment) throws Exception { + this(operator, 1, 1, 0, environment); + + config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn)); + } + public OneInputStreamOperatorTestHarness(OneInputStreamOperator operator) throws Exception { this(operator, 1, 1, 0); } @@ -59,6 +69,17 @@ public class OneInputStreamOperatorTestHarness this.oneInputOperator = operator; } + public OneInputStreamOperatorTestHarness( + OneInputStreamOperator operator, + int maxParallelism, + int numTubtasks, + int subtaskIndex, + Environment environment) throws Exception { + super(operator, maxParallelism, numTubtasks, subtaskIndex, environment); + + this.oneInputOperator = operator; + } + public void processElement(IN value, long timestamp) throws Exception { processElement(new StreamRecord<>(value, timestamp)); } http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java index 3631965..8ea1bd8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java @@ -41,6 +41,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase { @@ -206,6 +207,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase @Test public void testAsyncWaitOperator() throws Exception { final int numElements = 5; + final long timeout = 1000L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -240,7 +242,12 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase } }; - DataStream orderedResult = AsyncDataStream.orderedWait(input, function, 2).setParallelism(1); + DataStream orderedResult = AsyncDataStream.orderedWait( + input, + function, + timeout, + TimeUnit.MILLISECONDS, + 2).setParallelism(1); // save result from ordered process final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0); @@ -249,8 +256,12 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase orderedResult.addSink(sinkFunction1).setParallelism(1); - - DataStream unorderedResult = AsyncDataStream.unorderedWait(input, function, 2); + DataStream unorderedResult = AsyncDataStream.unorderedWait( + input, + function, + timeout, + TimeUnit.MILLISECONDS, + 2); // save result from unordered process final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);