Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D55A01773C for ; Wed, 23 Sep 2015 19:12:22 +0000 (UTC) Received: (qmail 42412 invoked by uid 500); 23 Sep 2015 19:12:22 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 42366 invoked by uid 500); 23 Sep 2015 19:12:22 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 42350 invoked by uid 99); 23 Sep 2015 19:12:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Sep 2015 19:12:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 71476DFF6F; Wed, 23 Sep 2015 19:12:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Wed, 23 Sep 2015 19:12:23 -0000 Message-Id: <61368bc0b73c4c369e42f4b39aad5408@git.apache.org> In-Reply-To: <5882c90e2ce14db38e14ff25b8db97cb@git.apache.org> References: <5882c90e2ce14db38e14ff25b8db97cb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-2675] [streaming] Integrate Timer Service with StreamTask [FLINK-2675] [streaming] Integrate Timer Service with StreamTask This integrates the timer as a service in StreamTask that StreamOperators can use by calling a method on the StreamingRuntimeContext. This also ensures that the timer callbacks can not be called concurrently with other methods on the StreamOperator. This behaviour is ensured by an ITCase. This closes #1165 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/717d54c8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/717d54c8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/717d54c8 Branch: refs/heads/master Commit: 717d54c826577fa0e1bb6c9a0e791ac15d040da9 Parents: 7212042 Author: Aljoscha Krettek Authored: Wed Jul 15 11:36:29 2015 +0200 Committer: Stephan Ewen Committed: Wed Sep 23 20:08:18 2015 +0200 ---------------------------------------------------------------------- .../streaming/api/operators/StreamOperator.java | 12 + .../streaming/api/operators/StreamSource.java | 12 +- .../runtime/io/StreamInputProcessor.java | 31 +- .../runtime/io/StreamTwoInputProcessor.java | 32 +- .../runtime/operators/TriggerTimer.java | 119 ------- .../runtime/operators/Triggerable.java | 2 +- ...ractAlignedProcessingTimeWindowOperator.java | 103 ++---- .../runtime/tasks/OneInputStreamTask.java | 7 +- .../runtime/tasks/SourceStreamTask.java | 25 +- .../streaming/runtime/tasks/StreamTask.java | 120 ++++++- .../runtime/tasks/StreamingRuntimeContext.java | 27 +- .../streaming/runtime/tasks/TimerException.java | 35 +++ .../runtime/tasks/TwoInputStreamTask.java | 6 +- .../api/state/StatefulOperatorTest.java | 3 +- .../runtime/operators/StreamTaskTimerTest.java | 165 ++++++++++ .../runtime/operators/TriggerTimerTest.java | 137 -------- ...AlignedProcessingTimeWindowOperatorTest.java | 130 +++++--- ...AlignedProcessingTimeWindowOperatorTest.java | 131 ++++---- .../runtime/tasks/StreamTaskTimerITCase.java | 312 +++++++++++++++++++ .../flink/streaming/util/MockContext.java | 3 +- .../util/OneInputStreamOperatorTestHarness.java | 4 +- .../streaming/util/SourceFunctionUtil.java | 3 +- .../util/TwoInputStreamOperatorTestHarness.java | 8 +- .../GroupedProcessingTimeWindowExample.java | 2 +- 24 files changed, 932 insertions(+), 497 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 0706c07..d65dc64 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -29,8 +29,14 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators * that process elements. * + *

* The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} * offers default implementation for the lifecycle and properties methods. + * + *

+ * Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using + * the timer service, timer callbacks are also guaranteed not to be called concurrently with + * methods on {@code StreamOperator}. * * @param The output type of the operator */ @@ -58,6 +64,7 @@ public interface StreamOperator extends Serializable { * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}. + *

* The method is expected to flush all remaining buffered data. Exceptions during this flushing * of buffered should be propagated, in order to cause the operation to be recognized asa failed, @@ -81,6 +88,11 @@ public interface StreamOperator extends Serializable { // Context and chaining properties // ------------------------------------------------------------------------ + /** + * Returns a context that allows the operator to query information about the execution and also + * to interact with systems such as broadcast variables and managed state. This also allows + * to register timers. + */ StreamingRuntimeContext getRuntimeContext(); /** http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index e100fa6..ecf799b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; public class StreamSource extends AbstractUdfStreamOperator> implements StreamOperator { private static final long serialVersionUID = 1L; + private transient SourceFunction.SourceContext ctx; public StreamSource(SourceFunction sourceFunction) { super(sourceFunction); @@ -41,9 +42,7 @@ public class StreamSource extends AbstractUdfStreamOperator> collector) throws Exception { - - SourceFunction.SourceContext ctx; + public void run(final Object lockingObject, final Output> collector) throws Exception { if (userFunction instanceof EventTimeSourceFunction) { ctx = new ManualWatermarkContext(lockingObject, collector); @@ -64,6 +63,7 @@ public class StreamSource extends AbstractUdfStreamOperator extends AbstractUdfStreamOperator= watermarkInterval) { + if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) { synchronized (lockingObject) { - if (watermarkTime - lastWatermarkTime >= watermarkInterval) { + if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) { output.emitWatermark(new Watermark(watermarkTime)); lastWatermarkTime = watermarkTime; } @@ -210,7 +210,7 @@ public class StreamSource extends AbstractUdfStreamOperator= watermarkInterval) { + if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) { output.emitWatermark(new Watermark(watermarkTime)); lastWatermarkTime = watermarkTime; } http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 8ce8a01..f50ddcd 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -47,8 +47,14 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; /** * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. * - *

This also keeps track of {@link Watermark} events and forwards them to event subscribers - * once the {@link Watermark} from all inputs advances.

+ *

+ * This also keeps track of {@link Watermark} events and forwards them to event subscribers + * once the {@link Watermark} from all inputs advances. + * + *

+ * Forwarding elements or watermarks must be protected by synchronizing on the given lock + * object. This ensures that we don't call methods on a {@link OneInputStreamOperator} concurrently + * with the timer callback or other things. * * @param The type of the record that can be read with this record reader. */ @@ -118,9 +124,9 @@ public class StreamInputProcessor { } lastEmittedWatermark = Long.MIN_VALUE; } - - - public boolean processInput(OneInputStreamOperator streamOperator) throws Exception { + + @SuppressWarnings("unchecked") + public boolean processInput(OneInputStreamOperator streamOperator, Object lock) throws Exception { if (isFinished) { return false; } @@ -147,19 +153,22 @@ public class StreamInputProcessor { } if (newMinWatermark > lastEmittedWatermark) { lastEmittedWatermark = newMinWatermark; - streamOperator.processWatermark(new Watermark(lastEmittedWatermark)); + synchronized (lock) { + streamOperator.processWatermark(new Watermark(lastEmittedWatermark)); + } } } continue; - } - else { + } else { // now we can do the actual processing StreamRecord record = recordOrWatermark.asRecord(); StreamingRuntimeContext ctx = streamOperator.getRuntimeContext(); - if (ctx != null) { - ctx.setNextInput(record); + synchronized (lock) { + if (ctx != null) { + ctx.setNextInput(record); + } + streamOperator.processElement(record); } - streamOperator.processElement(record); return true; } } http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 6322cc8..882037e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -52,6 +52,11 @@ import java.util.Collection; * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances. * + *

+ * Forwarding elements or watermarks must be protected by synchronizing on the given lock + * object. This ensures that we don't call methods on a {@link TwoInputStreamOperator} concurrently + * with the timer callback or other things. + * * @param The type of the records that arrive on the first input * @param The type of the records that arrive on the second input */ @@ -151,8 +156,9 @@ public class StreamTwoInputProcessor { Arrays.fill(watermarks2, Long.MIN_VALUE); lastEmittedWatermark2 = Long.MIN_VALUE; } - - public boolean processInput(TwoInputStreamOperator streamOperator) throws Exception { + + @SuppressWarnings("unchecked") + public boolean processInput(TwoInputStreamOperator streamOperator, Object lock) throws Exception { if (isFinished) { return false; } @@ -175,11 +181,13 @@ public class StreamTwoInputProcessor { if (currentChannel < numInputChannels1) { StreamElement recordOrWatermark = deserializationDelegate1.getInstance(); if (recordOrWatermark.isWatermark()) { - handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel); + handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel, lock); continue; } else { - streamOperator.processElement1(recordOrWatermark.asRecord()); + synchronized (lock) { + streamOperator.processElement1(recordOrWatermark.asRecord()); + } return true; } @@ -187,11 +195,13 @@ public class StreamTwoInputProcessor { else { StreamElement recordOrWatermark = deserializationDelegate2.getInstance(); if (recordOrWatermark.isWatermark()) { - handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel); + handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock); continue; } else { - streamOperator.processElement2(recordOrWatermark.asRecord()); + synchronized (lock) { + streamOperator.processElement2(recordOrWatermark.asRecord()); + } return true; } } @@ -224,7 +234,7 @@ public class StreamTwoInputProcessor { } } - private void handleWatermark(TwoInputStreamOperator operator, Watermark mark, int channelIndex) throws Exception { + private void handleWatermark(TwoInputStreamOperator operator, Watermark mark, int channelIndex, Object lock) throws Exception { if (channelIndex < numInputChannels1) { long watermarkMillis = mark.getTimestamp(); if (watermarkMillis > watermarks1[channelIndex]) { @@ -235,7 +245,9 @@ public class StreamTwoInputProcessor { } if (newMinWatermark > lastEmittedWatermark1) { lastEmittedWatermark1 = newMinWatermark; - operator.processWatermark1(new Watermark(lastEmittedWatermark1)); + synchronized (lock) { + operator.processWatermark1(new Watermark(lastEmittedWatermark1)); + } } } } else { @@ -249,7 +261,9 @@ public class StreamTwoInputProcessor { } if (newMinWatermark > lastEmittedWatermark2) { lastEmittedWatermark2 = newMinWatermark; - operator.processWatermark2(new Watermark(lastEmittedWatermark2)); + synchronized (lock) { + operator.processWatermark2(new Watermark(lastEmittedWatermark2)); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java deleted file mode 100644 index 7528eb3..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators; - -import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * A timer that triggers targets at a specific point in the future. This timer executes single-threaded, - * which means that never more than one trigger will be executed at the same time. - *

- * This timer generally maintains order of trigger events. This means that for two triggers scheduled at - * different times, the one scheduled for the later time will be executed after the one scheduled for the - * earlier time. - */ -public class TriggerTimer { - - /** The thread group that holds all trigger timer threads */ - public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers"); - - /** The executor service that */ - private final ScheduledExecutorService scheduler; - - - /** - * Creates a new trigger timer, where the timer thread has the default name "TriggerTimer Thread". - */ - public TriggerTimer() { - this("TriggerTimer Thread"); - } - - /** - * Creates a new trigger timer, where the timer thread has the given name. - * - * @param triggerName The name for the trigger thread. - */ - public TriggerTimer(String triggerName) { - this.scheduler = Executors.newSingleThreadScheduledExecutor( - new DispatcherThreadFactory(TRIGGER_THREADS_GROUP, triggerName)); - } - - /** - * Schedules a new trigger event. The trigger event will occur roughly at the given timestamp. - * If the timestamp is in the past (or now), the trigger will be queued for immediate execution. Note that other - * triggers that are to be executed now will be executed before this trigger. - * - * @param target The target to be triggered. - * @param timestamp The timestamp when the trigger should occur, and the timestamp given - * to the trigger-able target. - */ - public void scheduleTriggerAt(Triggerable target, long timestamp) { - long delay = Math.max(timestamp - System.currentTimeMillis(), 0); - - scheduler.schedule( - new TriggerTask(target, timestamp), - delay, - TimeUnit.MILLISECONDS); - } - - /** - * Shuts down the trigger timer, canceling all pending triggers and stopping the trigger thread. - */ - public void shutdown() { - scheduler.shutdownNow(); - } - - /** - * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original - * shutdown method was never called. - *

- * This should not be relied upon! It will cause shutdown to happen much later than if manual - * shutdown is attempted, and cause threads to linger for longer than needed. - */ - @Override - @SuppressWarnings("FinalizeDoesntCallSuperFinalize") - protected void finalize() { - shutdown(); - } - - // ------------------------------------------------------------------------ - - /** - * Internal task that is invoked by the scheduled executor and triggers the target. - */ - private static final class TriggerTask implements Runnable { - - private final Triggerable target; - private final long timestamp; - - TriggerTask(Triggerable target, long timestamp) { - this.target = target; - this.timestamp = timestamp; - } - - @Override - public void run() { - target.trigger(timestamp); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java index 626f087..ac1a543 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java @@ -34,5 +34,5 @@ public interface Triggerable { * * @param timestamp The timestamp for which the trigger event was scheduled. */ - void trigger(long timestamp); + void trigger(long timestamp) throws Exception ; } http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java index f5f576d..2e926bc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java @@ -24,12 +24,10 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.MathUtils; -import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.TriggerTimer; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -41,9 +39,6 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator private static final long MIN_SLIDE_TIME = 50; - - private final SerializableObject lock = new SerializableObject(); - // ----- fields for operator parametrization ----- private final Function function; @@ -60,14 +55,9 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator private transient TimestampedCollector out; - private transient TriggerTimer triggerTimer; - private transient long nextEvaluationTime; private transient long nextSlideTime; - private transient volatile Throwable asyncError; - - protected AbstractAlignedProcessingTimeWindowOperator( Function function, KeySelector keySelector, @@ -123,47 +113,31 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator nextEvaluationTime = now + windowSlide - (now % windowSlide); nextSlideTime = now + paneSize - (now % paneSize); - // start the trigger timer - triggerTimer = new TriggerTimer("Trigger for " + getRuntimeContext().getTaskName()); - - // schedule the first trigger - triggerTimer.scheduleTriggerAt(this, Math.min(nextEvaluationTime, nextSlideTime)); + getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this); } @Override public void close() throws Exception { - // acquire the lock during shutdown, to prevent trigger calls at the same time - synchronized (lock) { - final long finalWindowTimestamp = nextEvaluationTime; - - // early stop the triggering thread, so it does not attempt to return any more data - stopTriggers(); + final long finalWindowTimestamp = nextEvaluationTime; - // make sure we had no asynchronous error so far - checkErroneous(); - - // emit the remaining data - computeWindow(finalWindowTimestamp); - } + // early stop the triggering thread, so it does not attempt to return any more data + stopTriggers(); + + // emit the remaining data + computeWindow(finalWindowTimestamp); } @Override public void dispose() { // acquire the lock during shutdown, to prevent trigger calls at the same time - synchronized (lock) { - // fail-safe stop of the triggering thread (in case of an error) - stopTriggers(); - - // release the panes - panes.dispose(); - } + // fail-safe stop of the triggering thread (in case of an error) + stopTriggers(); + + // release the panes + panes.dispose(); } private void stopTriggers() { - if (triggerTimer != null) { - triggerTimer.shutdown(); - } - // reset the action timestamps. this makes sure any pending triggers will not evaluate nextEvaluationTime = -1L; nextSlideTime = -1L; @@ -175,10 +149,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator @Override public void processElement(StreamRecord element) throws Exception { - synchronized (lock) { - checkErroneous(); - panes.addElementToLatestPane(element.getValue()); - } + panes.addElementToLatestPane(element.getValue()); } @Override @@ -187,42 +158,24 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator } @Override - public void trigger(long timestamp) { - synchronized (lock) { - // first we check if we actually trigger the window function - if (timestamp == nextEvaluationTime) { - // compute and output the results - try { - computeWindow(timestamp); - } - catch (Throwable t) { - this.asyncError = t; - } + public void trigger(long timestamp) throws Exception { + // first we check if we actually trigger the window function + if (timestamp == nextEvaluationTime) { + // compute and output the results + computeWindow(timestamp); - nextEvaluationTime += windowSlide; - } - - // check if we slide the panes by one. this may happen in addition to the - // window computation, or just by itself - if (timestamp == nextSlideTime) { - try { - panes.slidePanes(numPanesPerWindow); - } - catch (Throwable t) { - this.asyncError = t; - } - nextSlideTime += paneSize; - } - - long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); - triggerTimer.scheduleTriggerAt(this, nextTriggerTime); + nextEvaluationTime += windowSlide; } - } - - private void checkErroneous() throws Exception { - if (asyncError != null) { - throw new Exception("Error while computing and producing window result", asyncError); + + // check if we slide the panes by one. this may happen in addition to the + // window computation, or just by itself + if (timestamp == nextSlideTime) { + panes.slidePanes(numPanesPerWindow); + nextSlideTime += paneSize; } + + long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); + getRuntimeContext().registerTimer(nextTriggerTime, this); } private void computeWindow(long timestamp) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 8ef02b2..89eac92 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -30,7 +30,6 @@ public class OneInputStreamTask extends StreamTask inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); @@ -53,7 +52,11 @@ public class OneInputStreamTask extends StreamTask extends StreamTask> { protected void run() throws Exception { final Object checkpointLock = getCheckpointLock(); - final SourceOutput> output = - new SourceOutput>(outputHandler.getOutput(), checkpointLock); + final SourceOutput> output = new SourceOutput<>(outputHandler.getOutput(), checkpointLock); streamOperator.run(checkpointLock, output); } @@ -65,11 +64,18 @@ public class SourceStreamTask extends StreamTask> { // ------------------------------------------------------------------------ - // TODO: - // does this help with anything? The losk should be already held by the source function that - // emits. If that one does not hold the lock, then this does not help either. - - private static class SourceOutput implements Output { + /** + * Special output for sources that ensures that sources synchronize on the lock object before + * emitting elements. + * + *

+ * This is required to ensure that no concurrent method calls on operators later in the chain + * can occur. When operators register a timer the timer callback is synchronized + * on the same lock object. + * + * @param The type of elements emitted by the source. + */ + private class SourceOutput implements Output { private final Output output; private final Object lockObject; @@ -89,6 +95,9 @@ public class SourceStreamTask extends StreamTask> { @Override public void collect(T record) { synchronized (lockObject) { + if (timerException != null) { + throw timerException; + } output.collect(record); } } http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index d357a4d..1b35350 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -21,6 +21,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.NotNullPredicate; @@ -38,6 +41,7 @@ import org.apache.flink.runtime.state.FileStateHandle; import org.apache.flink.runtime.state.LocalStateHandle; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StatefulStreamOperator; @@ -45,12 +49,15 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.state.OperatorStateHandle; import org.apache.flink.streaming.api.state.WrapperStateHandle; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * Base Invokable for all {@code StreamTasks}. A {@code StreamTask} processes input and forwards + * elements and watermarks to a {@link StreamOperator}. + * *

  *     
  *  -- registerInputOutput()
@@ -68,6 +75,11 @@ import org.slf4j.LoggerFactory;
  *        +----> common cleanup
  *        +----> operator specific cleanup()
  * 
+ * + *

+ * {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a + * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods + * are called concurrently. * * @param * @param @@ -76,8 +88,11 @@ public abstract class StreamTask> extends Abs private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); - - private final Object checkpointLock = new Object(); + /** + * All interaction with the {@code StreamOperator} must be synchronized on this lock object to ensure that + * we don't have concurrent method calls. + */ + protected final Object lock = new Object(); private final EventListener checkpointBarrierListener; @@ -88,7 +103,19 @@ public abstract class StreamTask> extends Abs protected StreamConfig configuration; protected ClassLoader userClassLoader; - + + /** The thread group that holds all trigger timer threads */ + public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers"); + + /** The executor service that */ + private ScheduledExecutorService timerService; + + /** + * This field is used to forward an exception that is caught in the timer thread. Subclasses + * must ensure that exceptions stored here get thrown on the actual execution Thread. + */ + protected volatile TimerException timerException = null; + protected OutputHandler outputHandler; protected O streamOperator; @@ -136,7 +163,7 @@ public abstract class StreamTask> extends Abs Map> accumulatorMap = accumulatorRegistry.getUserMap(); AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); - outputHandler = new OutputHandler(this, accumulatorMap, reporter); + outputHandler = new OutputHandler<>(this, accumulatorMap, reporter); if (streamOperator != null) { // IterationHead and IterationTail don't have an Operator... @@ -148,7 +175,10 @@ public abstract class StreamTask> extends Abs } hasChainedOperators = outputHandler.getChainedOperators().size() != 1; - + + this.timerService = Executors.newSingleThreadScheduledExecutor( + new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName())); + // operator specific initialization init(); @@ -175,7 +205,7 @@ public abstract class StreamTask> extends Abs // make sure no further checkpoint and notification actions happen. // we make sure that no other thread is currently in the locked scope before // we close the operators by trying to acquire the checkpoint scope lock - synchronized (checkpointLock) {} + synchronized (lock) {} // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); @@ -190,6 +220,8 @@ public abstract class StreamTask> extends Abs } finally { isRunning = false; + + timerService.shutdown(); // release the output resources. this method should never fail. if (outputHandler != null) { @@ -259,6 +291,25 @@ public abstract class StreamTask> extends Abs } } + /** + * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original + * shutdown method was never called. + * + *

+ * This should not be relied upon! It will cause shutdown to happen much later than if manual + * shutdown is attempted, and cause threads to linger for longer than needed. + */ + @Override + @SuppressWarnings("FinalizeDoesntCallSuperFinalize") + protected void finalize() { + if (timerService != null) { + if (!timerService.isTerminated()) { + LOG.warn("Timer service was not shut down. Shutting down in finalize()."); + } + timerService.shutdown(); + } + } + // ------------------------------------------------------------------------ // Access to properties and utilities // ------------------------------------------------------------------------ @@ -272,7 +323,7 @@ public abstract class StreamTask> extends Abs } public Object getCheckpointLock() { - return checkpointLock; + return lock; } // ------------------------------------------------------------------------ @@ -303,12 +354,11 @@ public abstract class StreamTask> extends Abs LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - synchronized (checkpointLock) { + synchronized (lock) { if (isRunning) { try { // We wrap the states of the chained operators in a list, marking non-stateful operators with null - List, Map>> chainedStates - = new ArrayList, Map>>(); + List, Map>> chainedStates = new ArrayList<>(); // A wrapper handle is created for the List of statehandles WrapperStateHandle stateHandle; @@ -352,7 +402,7 @@ public abstract class StreamTask> extends Abs @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - synchronized (checkpointLock) { + synchronized (lock) { if (isRunning) { for (StreamOperator chainedOperator : outputHandler.getChainedOperators()) { if (chainedOperator instanceof StatefulStreamOperator) { @@ -412,6 +462,18 @@ public abstract class StreamTask> extends Abs JOBMANAGER, FILESYSTEM } + /** + * Registers a timer. + */ + public void registerTimer(final long timestamp, final Triggerable target) { + long delay = Math.max(timestamp - System.currentTimeMillis(), 0); + + timerService.schedule( + new TriggerTask(this, lock, target, timestamp), + delay, + TimeUnit.MILLISECONDS); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -420,7 +482,7 @@ public abstract class StreamTask> extends Abs KeySelector statePartitioner = conf.getStatePartitioner(userClassLoader); return new StreamingRuntimeContext(getEnvironment(), getExecutionConfig(), - statePartitioner, getStateHandleProvider(), accumulatorMap); + statePartitioner, getStateHandleProvider(), accumulatorMap, this); } @Override @@ -446,4 +508,36 @@ public abstract class StreamTask> extends Abs } } } + + /** + * Internal task that is invoked by the timer service and triggers the target. + */ + private static final class TriggerTask implements Runnable { + + private final Object lock; + private final Triggerable target; + private final long timestamp; + private final StreamTask task; + + TriggerTask(StreamTask task, final Object lock, Triggerable target, long timestamp) { + this.task = task; + this.lock = lock; + this.target = target; + this.timestamp = timestamp; + } + + @Override + public void run() { + synchronized (lock) { + try { + target.trigger(timestamp); + } catch (Throwable t) { + LOG.error("Caught exception while processing timer.", t); + if (task.timerException == null) { + task.timerException = new TimerException(t); + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java index b82888e..0ac352b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState; import org.apache.flink.streaming.api.state.StreamOperatorState; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** @@ -52,16 +53,21 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { private final List> partitionedStates; private final KeySelector statePartitioner; private final StateHandleProvider provider; - - + + /** + * We need access to the {@link StreamTask} to register timer callbacks. + */ + private final StreamTask streamTask; + @SuppressWarnings("unchecked") public StreamingRuntimeContext( Environment env, ExecutionConfig executionConfig, KeySelector statePartitioner, StateHandleProvider provider, - Map> accumulatorMap) { - + Map> accumulatorMap, + StreamTask streamTask) { + super(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getUserClassLoader(), executionConfig, env.getDistributedCacheEntries(), accumulatorMap); @@ -71,6 +77,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { this.states = new HashMap<>(); this.partitionedStates = new LinkedList<>(); this.provider = (StateHandleProvider) provider; + this.streamTask = streamTask; } /** @@ -167,6 +174,18 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { } /** + * Register a timer callback. At the specified time the + * {@code Triggerable } will be invoked. This call is guaranteed to not happen + * concurrently with method calls on the operator. + * + * @param time The absolute time in milliseconds. + * @param target The target to be triggered. + */ + public void registerTimer(long time, Triggerable target) { + streamTask.registerTimer(time, target); + } + + /** * Sets the next input of the underlying operators, used to access * partitioned states. * http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java new file mode 100644 index 0000000..3e1c1e5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.tasks; + +/** + * {@code RuntimeException} for wrapping exceptions that are thrown in the timer callback of + * the timer service in {@link StreamTask}. + */ +public class TimerException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public TimerException(Throwable cause) { + super(cause); + } + + @Override + public String toString() { + return "TimerException{" + getCause() + "}"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 5d0497d..25f1a76 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -75,7 +75,11 @@ public class TwoInputStreamTask extends StreamTask(), - new HashMap>()); + new HashMap>(), + null); StreamMap op = new StreamMap(new StatefulMapper()); http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java new file mode 100644 index 0000000..2aed041 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +/** + * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ResultPartitionWriter.class}) +public class StreamTaskTimerTest { + + @Test + public void testOpenCloseAndTimestamps() throws Exception { + final OneInputStreamTask mapTask = new OneInputStreamTask<>(); + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + StreamMap mapOperator = new StreamMap<>(null); + streamConfig.setStreamOperator(mapOperator); + + testHarness.invoke(); + + // first one spawns thread + mapTask.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) {} + }); + + assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount()); + + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + + // thread needs to die in time + long deadline = System.currentTimeMillis() + 4000; + while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(10); + } + + assertEquals("Trigger timer thread did not properly shut down", + 0, StreamTask.TRIGGER_THREAD_GROUP.activeCount()); + } + + @Test + public void checkScheduledTimestampe() { + try { + + final OneInputStreamTask mapTask = new OneInputStreamTask<>(); + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + StreamMap mapOperator = new StreamMap<>(null); + streamConfig.setStreamOperator(mapOperator); + + testHarness.invoke(); + + final AtomicReference errorRef = new AtomicReference<>(); + + final long t1 = System.currentTimeMillis(); + final long t2 = System.currentTimeMillis() - 200; + final long t3 = System.currentTimeMillis() + 100; + final long t4 = System.currentTimeMillis() + 200; + + mapTask.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0)); + mapTask.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1)); + mapTask.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2)); + mapTask.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3)); + + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + + long deadline = System.currentTimeMillis() + 5000; + while (errorRef.get() == null && + ValidatingTriggerable.numInSequence < 4 && + System.currentTimeMillis() < deadline) + { + Thread.sleep(100); + } + + // handle errors + if (errorRef.get() != null) { + errorRef.get().printStackTrace(); + fail(errorRef.get().getMessage()); + } + + assertEquals(4, ValidatingTriggerable.numInSequence); + + + // wait until the trigger thread is shut down. otherwise, the other tests may become unstable + deadline = System.currentTimeMillis() + 4000; + while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(10); + } + + assertEquals("Trigger timer thread did not properly shut down", + 0, StreamTask.TRIGGER_THREAD_GROUP.activeCount()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static class ValidatingTriggerable implements Triggerable { + + static int numInSequence; + + private final AtomicReference errorRef; + + private final long expectedTimestamp; + private final int expectedInSequence; + + private ValidatingTriggerable(AtomicReference errorRef, long expectedTimestamp, int expectedInSequence) { + this.errorRef = errorRef; + this.expectedTimestamp = expectedTimestamp; + this.expectedInSequence = expectedInSequence; + } + + @Override + public void trigger(long timestamp) { + try { + assertEquals(expectedTimestamp, timestamp); + assertEquals(expectedInSequence, numInSequence); + numInSequence++; + } + catch (Throwable t) { + errorRef.compareAndSet(null, t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java deleted file mode 100644 index 1349cb3..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators; - -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.*; - -public class TriggerTimerTest { - - @Test - public void testThreadGroupAndShutdown() { - try { - TriggerTimer timer = new TriggerTimer(); - - // first one spawns thread - timer.scheduleTriggerAt(new Triggerable() { - @Override - public void trigger(long timestamp) {} - }, System.currentTimeMillis()); - - assertEquals(1, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount()); - - // thread needs to die in time - timer.shutdown(); - - long deadline = System.currentTimeMillis() + 2000; - while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { - Thread.sleep(10); - } - - assertEquals("Trigger timer thread did not properly shut down", - 0, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void checkScheduledTimestampe() { - try { - final TriggerTimer timer = new TriggerTimer(); - - final AtomicReference errorRef = new AtomicReference<>(); - - final long t1 = System.currentTimeMillis(); - final long t2 = System.currentTimeMillis() - 200; - final long t3 = System.currentTimeMillis() + 100; - final long t4 = System.currentTimeMillis() + 200; - - timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t1, 0), t1); - timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t2, 1), t2); - timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t3, 2), t3); - timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t4, 3), t4); - - long deadline = System.currentTimeMillis() + 5000; - while (errorRef.get() == null && - ValidatingTriggerable.numInSequence < 4 && - System.currentTimeMillis() < deadline) - { - Thread.sleep(100); - } - - // handle errors - if (errorRef.get() != null) { - errorRef.get().printStackTrace(); - fail(errorRef.get().getMessage()); - } - - assertEquals(4, ValidatingTriggerable.numInSequence); - - timer.shutdown(); - - // wait until the trigger thread is shut down. otherwise, the other tests may become unstable - deadline = System.currentTimeMillis() + 2000; - while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { - Thread.sleep(10); - } - - assertEquals("Trigger timer thread did not properly shut down", - 0, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - - private static class ValidatingTriggerable implements Triggerable { - - static int numInSequence; - - private final AtomicReference errorRef; - - private final long expectedTimestamp; - private final int expectedInSequence; - - private ValidatingTriggerable(AtomicReference errorRef, long expectedTimestamp, int expectedInSequence) { - this.errorRef = errorRef; - this.expectedTimestamp = expectedTimestamp; - this.expectedInSequence = expectedInSequence; - } - - @Override - public void trigger(long timestamp) { - try { - assertEquals(expectedTimestamp, timestamp); - assertEquals(expectedInSequence, numInSequence); - numInSequence++; - } - catch (Throwable t) { - errorRef.compareAndSet(null, t); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 685939b..19801f1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -22,17 +22,24 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.operators.TriggerTimer; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.util.Collector; import org.junit.After; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.*; import static org.junit.Assert.*; @@ -71,7 +78,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { public void checkNoTriggerThreadsRunning() { // make sure that all the threads we trigger are shut down long deadline = System.currentTimeMillis() + 5000; - while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { + while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { try { Thread.sleep(10); } @@ -79,7 +86,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } assertTrue("Not all trigger threads where properly shut down", - TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() == 0); + StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0); } // ------------------------------------------------------------------------ @@ -281,12 +288,37 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Test public void testTumblingWindowSingleElements() { + final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + try { final CollectingOutput out = new CollectingOutput<>(50); final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); when(mockContext.getTaskName()).thenReturn("Test task name"); + final Object lock = new Object(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + final Long timestamp = (Long) invocationOnMock.getArguments()[0]; + final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1]; + timerService.schedule( + new Callable() { + @Override + public Object call() throws Exception { + synchronized (lock) { + target.trigger(timestamp); + } + return null; + } + }, + timestamp - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + return null; + } + }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class)); + // tumbling window that triggers every 20 milliseconds AbstractAlignedProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50); @@ -294,16 +326,22 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { op.setup(out, mockContext); op.open(new Configuration()); - op.processElement(new StreamRecord(1)); - op.processElement(new StreamRecord(2)); + synchronized (lock) { + op.processElement(new StreamRecord(1)); + op.processElement(new StreamRecord(2)); + } out.waitForNElements(2, 60000); - op.processElement(new StreamRecord(3)); - op.processElement(new StreamRecord(4)); - op.processElement(new StreamRecord(5)); + synchronized (lock) { + op.processElement(new StreamRecord(3)); + op.processElement(new StreamRecord(4)); + op.processElement(new StreamRecord(5)); + } out.waitForNElements(5, 60000); - op.processElement(new StreamRecord(6)); + synchronized (lock) { + op.processElement(new StreamRecord(6)); + } out.waitForNElements(6, 60000); List result = out.getElements(); @@ -318,26 +356,55 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); + } finally { + timerService.shutdown(); } } @Test public void testSlidingWindowSingleElements() { + final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + try { final CollectingOutput out = new CollectingOutput<>(50); final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); when(mockContext.getTaskName()).thenReturn("Test task name"); + final Object lock = new Object(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + final Long timestamp = (Long) invocationOnMock.getArguments()[0]; + final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1]; + timerService.schedule( + new Callable() { + @Override + public Object call() throws Exception { + synchronized (lock) { + target.trigger(timestamp); + } + return null; + } + }, + timestamp - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + return null; + } + }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class)); + // tumbling window that triggers every 20 milliseconds AbstractAlignedProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50); op.setup(out, mockContext); op.open(new Configuration()); - - op.processElement(new StreamRecord(1)); - op.processElement(new StreamRecord(2)); + + synchronized (lock) { + op.processElement(new StreamRecord(1)); + op.processElement(new StreamRecord(2)); + } // each element should end up in the output three times // wait until the elements have arrived 6 times in the output @@ -355,6 +422,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); + } finally { + timerService.shutdown(); } } @@ -395,43 +464,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @Test - public void testPropagateExceptionsFromTrigger() { - try { - final CollectingOutput out = new CollectingOutput<>(); - - final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); - when(mockContext.getTaskName()).thenReturn("Test task name"); - - KeyedWindowFunction failingFunction = new FailingFunction(100); - - AbstractAlignedProcessingTimeWindowOperator op = - new AccumulatingProcessingTimeWindowOperator<>(failingFunction, identitySelector, 50, 50); - - op.setup(out, mockContext); - op.open(new Configuration()); - - try { - int num = 0; - while (num < Integer.MAX_VALUE) { - op.processElement(new StreamRecord(num++)); - Thread.sleep(1); - } - fail("This should really have failed with an exception quite a while ago..."); - } - catch (Exception e) { - assertNotNull(e.getCause()); - assertTrue(e.getCause().getMessage().contains("Artificial Test Exception")); - } - - op.dispose(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test public void testPropagateExceptionsFromClose() { try { final CollectingOutput out = new CollectingOutput<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 464df32..0ff974c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -22,22 +22,33 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.operators.TriggerTimer; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.junit.After; +import org.junit.Ignore; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -70,7 +81,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { public void checkNoTriggerThreadsRunning() { // make sure that all the threads we trigger are shut down long deadline = System.currentTimeMillis() + 5000; - while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { + while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { try { Thread.sleep(10); } @@ -78,7 +89,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } assertTrue("Not all trigger threads where properly shut down", - TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() == 0); + StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0); } // ------------------------------------------------------------------------ @@ -222,13 +233,38 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } @Test - public void testTumblingWindowDuplicateElements() { + public void testTumblingWindowDuplicateElements() { + + final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + try { final int windowSize = 50; final CollectingOutput out = new CollectingOutput<>(windowSize); final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); when(mockContext.getTaskName()).thenReturn("Test task name"); + + final Object lock = new Object(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + final Long timestamp = (Long) invocationOnMock.getArguments()[0]; + final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1]; + timerService.schedule( + new Callable() { + @Override + public Object call() throws Exception { + synchronized (lock) { + target.trigger(timestamp); + } + return null; + } + }, + timestamp - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + return null; + } + }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class)); AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator<>( @@ -245,8 +281,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { while (window <= numWindows) { long nextTime = op.getNextEvaluationTime(); int val = ((int) nextTime) ^ ((int) (nextTime >>> 32)); - - op.processElement(new StreamRecord(val)); + + synchronized (lock) { + op.processElement(new StreamRecord(val)); + } if (nextTime != previousNextTime) { window++; @@ -272,6 +310,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); + } finally { + timerService.shutdown(); } } @@ -332,21 +372,47 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testSlidingWindowSingleElements() { + final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + try { final CollectingOutput out = new CollectingOutput<>(50); final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); when(mockContext.getTaskName()).thenReturn("Test task name"); + final Object lock = new Object(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + final Long timestamp = (Long) invocationOnMock.getArguments()[0]; + final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1]; + timerService.schedule( + new Callable() { + @Override + public Object call() throws Exception { + synchronized (lock) { + target.trigger(timestamp); + } + return null; + } + }, + timestamp - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + return null; + } + }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class)); + // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50); op.setup(out, mockContext); op.open(new Configuration()); - - op.processElement(new StreamRecord(1)); - op.processElement(new StreamRecord(2)); + + synchronized (lock) { + op.processElement(new StreamRecord(1)); + op.processElement(new StreamRecord(2)); + } // each element should end up in the output three times // wait until the elements have arrived 6 times in the output @@ -364,6 +430,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); + } finally { + timerService.shutdown(); } } @@ -403,51 +471,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } @Test - public void testPropagateExceptionsFromTrigger() { - try { - final CollectingOutput out = new CollectingOutput<>(); - - final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); - when(mockContext.getTaskName()).thenReturn("Test task name"); - - ReduceFunction failingFunction = new FailingFunction(100); - - AggregatingProcessingTimeWindowOperator op = - new AggregatingProcessingTimeWindowOperator<>(failingFunction, identitySelector, 200, 50); - - op.setup(out, mockContext); - op.open(new Configuration()); - - try { - long nextWindowTime = op.getNextEvaluationTime(); - int val = 0; - for (int num = 0; num < Integer.MAX_VALUE; num++) { - op.processElement(new StreamRecord(val++)); - Thread.sleep(1); - - // when the window has advanced, reset the value, to generate the same values - // in the next pane again. This causes the aggregation on trigger to reduce values - if (op.getNextEvaluationTime() != nextWindowTime) { - nextWindowTime = op.getNextEvaluationTime(); - val = 0; - } - } - fail("This should really have failed with an exception quite a while ago..."); - } - catch (Exception e) { - assertNotNull(e.getCause()); - assertTrue(e.getCause().getMessage().contains("Artificial Test Exception")); - } - - op.dispose(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test public void testPropagateExceptionsFromProcessElement() { try { final CollectingOutput out = new CollectingOutput<>();