flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-2675] [streaming] Integrate Timer Service with StreamTask
Date Wed, 23 Sep 2015 19:12:23 GMT
[FLINK-2675] [streaming] Integrate Timer Service with StreamTask

This integrates the timer as a service in StreamTask that
StreamOperators can use by calling a method on the
StreamingRuntimeContext.

This also ensures that the timer callbacks can not be called
concurrently with other methods on the StreamOperator. This behaviour is
ensured by an ITCase.

This closes #1165


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/717d54c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/717d54c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/717d54c8

Branch: refs/heads/master
Commit: 717d54c826577fa0e1bb6c9a0e791ac15d040da9
Parents: 7212042
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Jul 15 11:36:29 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Sep 23 20:08:18 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/StreamOperator.java |  12 +
 .../streaming/api/operators/StreamSource.java   |  12 +-
 .../runtime/io/StreamInputProcessor.java        |  31 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  32 +-
 .../runtime/operators/TriggerTimer.java         | 119 -------
 .../runtime/operators/Triggerable.java          |   2 +-
 ...ractAlignedProcessingTimeWindowOperator.java | 103 ++----
 .../runtime/tasks/OneInputStreamTask.java       |   7 +-
 .../runtime/tasks/SourceStreamTask.java         |  25 +-
 .../streaming/runtime/tasks/StreamTask.java     | 120 ++++++-
 .../runtime/tasks/StreamingRuntimeContext.java  |  27 +-
 .../streaming/runtime/tasks/TimerException.java |  35 +++
 .../runtime/tasks/TwoInputStreamTask.java       |   6 +-
 .../api/state/StatefulOperatorTest.java         |   3 +-
 .../runtime/operators/StreamTaskTimerTest.java  | 165 ++++++++++
 .../runtime/operators/TriggerTimerTest.java     | 137 --------
 ...AlignedProcessingTimeWindowOperatorTest.java | 130 +++++---
 ...AlignedProcessingTimeWindowOperatorTest.java | 131 ++++----
 .../runtime/tasks/StreamTaskTimerITCase.java    | 312 +++++++++++++++++++
 .../flink/streaming/util/MockContext.java       |   3 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 +-
 .../streaming/util/SourceFunctionUtil.java      |   3 +-
 .../util/TwoInputStreamOperatorTestHarness.java |   8 +-
 .../GroupedProcessingTimeWindowExample.java     |   2 +-
 24 files changed, 932 insertions(+), 497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 0706c07..d65dc64 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -29,8 +29,14 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
  * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
  * that process elements.
  * 
+ * <p>
  * The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
  * offers default implementation for the lifecycle and properties methods.
+ *
+ * <p>
+ * Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
+ * the timer service, timer callbacks are also guaranteed not to be called concurrently with
+ * methods on {@code StreamOperator}.
  * 
  * @param <OUT> The output type of the operator
  */
@@ -58,6 +64,7 @@ public interface StreamOperator<OUT> extends Serializable {
 	 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
 	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
 	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
+
 	 * <p>
 	 * The method is expected to flush all remaining buffered data. Exceptions during this flushing
 	 * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
@@ -81,6 +88,11 @@ public interface StreamOperator<OUT> extends Serializable {
 	//  Context and chaining properties
 	// ------------------------------------------------------------------------
 	
+	/**
+	 * Returns a context that allows the operator to query information about the execution and also
+	 * to interact with systems such as broadcast variables and managed state. This also allows
+	 * to register timers.
+	 */
 	StreamingRuntimeContext getRuntimeContext();
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index e100fa6..ecf799b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction<T>> implements StreamOperator<T> {
 
 	private static final long serialVersionUID = 1L;
+	private transient SourceFunction.SourceContext<T> ctx;
 
 	public StreamSource(SourceFunction<T> sourceFunction) {
 		super(sourceFunction);
@@ -41,9 +42,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 		this.chainingStrategy = ChainingStrategy.HEAD;
 	}
 
-	public void run(Object lockingObject, Output<StreamRecord<T>> collector) throws Exception {
-
-		SourceFunction.SourceContext<T> ctx;
+	public void run(final Object lockingObject, final Output<StreamRecord<T>> collector) throws Exception {
 
 		if (userFunction instanceof EventTimeSourceFunction) {
 			ctx = new ManualWatermarkContext<T>(lockingObject, collector);
@@ -64,6 +63,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 
 	public void cancel() {
 		userFunction.cancel();
+		ctx.close();
 	}
 
 	/**
@@ -190,9 +190,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 					// don't have watermarks that creep along at different intervals because
 					// the machine clocks are out of sync
 					long watermarkTime = currentTime - (currentTime % watermarkInterval);
-					if (watermarkTime - lastWatermarkTime >= watermarkInterval) {
+					if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
 						synchronized (lockingObject) {
-							if (watermarkTime - lastWatermarkTime >= watermarkInterval) {
+							if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
 								output.emitWatermark(new Watermark(watermarkTime));
 								lastWatermarkTime = watermarkTime;
 							}
@@ -210,7 +210,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 				output.collect(reuse.replace(element, currentTime));
 
 				long watermarkTime = currentTime - (currentTime % watermarkInterval);
-				if (watermarkTime - lastWatermarkTime >= watermarkInterval) {
+				if (currentTime > watermarkTime && watermarkTime - lastWatermarkTime >= watermarkInterval) {
 					output.emitWatermark(new Watermark(watermarkTime));
 					lastWatermarkTime = watermarkTime;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 8ce8a01..f50ddcd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -47,8 +47,14 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
  *
- * <p>This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.</p>
+ * <p>
+ * This also keeps track of {@link Watermark} events and forwards them to event subscribers
+ * once the {@link Watermark} from all inputs advances.
+ *
+ * <p>
+ * Forwarding elements or watermarks must be protected by synchronizing on the given lock
+ * object. This ensures that we don't call methods on a {@link OneInputStreamOperator} concurrently
+ * with the timer callback or other things.
  * 
  * @param <IN> The type of the record that can be read with this record reader.
  */
@@ -118,9 +124,9 @@ public class StreamInputProcessor<IN> {
 		}
 		lastEmittedWatermark = Long.MIN_VALUE;
 	}
-	
-	
-	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws Exception {
+
+	@SuppressWarnings("unchecked")
+	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, Object lock) throws Exception {
 		if (isFinished) {
 			return false;
 		}
@@ -147,19 +153,22 @@ public class StreamInputProcessor<IN> {
 							}
 							if (newMinWatermark > lastEmittedWatermark) {
 								lastEmittedWatermark = newMinWatermark;
-								streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
+								synchronized (lock) {
+									streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
+								}
 							}
 						}
 						continue;
-					}
-					else {
+					} else {
 						// now we can do the actual processing
 						StreamRecord<IN> record = recordOrWatermark.asRecord();
 						StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
-						if (ctx != null) {
-							ctx.setNextInput(record);
+						synchronized (lock) {
+							if (ctx != null) {
+								ctx.setNextInput(record);
+							}
+							streamOperator.processElement(record);
 						}
-						streamOperator.processElement(record);
 						return true;
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 6322cc8..882037e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -52,6 +52,11 @@ import java.util.Collection;
  * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers
  * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances.
  *
+ * <p>
+ * Forwarding elements or watermarks must be protected by synchronizing on the given lock
+ * object. This ensures that we don't call methods on a {@link TwoInputStreamOperator} concurrently
+ * with the timer callback or other things.
+ *
  * @param <IN1> The type of the records that arrive on the first input
  * @param <IN2> The type of the records that arrive on the second input
  */
@@ -151,8 +156,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		Arrays.fill(watermarks2, Long.MIN_VALUE);
 		lastEmittedWatermark2 = Long.MIN_VALUE;
 	}
-	
-	public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws Exception {
+
+	@SuppressWarnings("unchecked")
+	public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator, Object lock) throws Exception {
 		if (isFinished) {
 			return false;
 		}
@@ -175,11 +181,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					if (currentChannel < numInputChannels1) {
 						StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
 						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
+							handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel, lock);
 							continue;
 						}
 						else {
-							streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
+							synchronized (lock) {
+								streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
+							}
 							return true;
 
 						}
@@ -187,11 +195,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					else {
 						StreamElement recordOrWatermark = deserializationDelegate2.getInstance();
 						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel);
+							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
 							continue;
 						}
 						else {
-							streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
+							synchronized (lock) {
+								streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
+							}
 							return true;
 						}
 					}
@@ -224,7 +234,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		}
 	}
 
-	private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex) throws Exception {
+	private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex, Object lock) throws Exception {
 		if (channelIndex < numInputChannels1) {
 			long watermarkMillis = mark.getTimestamp();
 			if (watermarkMillis > watermarks1[channelIndex]) {
@@ -235,7 +245,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 				}
 				if (newMinWatermark > lastEmittedWatermark1) {
 					lastEmittedWatermark1 = newMinWatermark;
-					operator.processWatermark1(new Watermark(lastEmittedWatermark1));
+					synchronized (lock) {
+						operator.processWatermark1(new Watermark(lastEmittedWatermark1));
+					}
 				}
 			}
 		} else {
@@ -249,7 +261,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 				}
 				if (newMinWatermark > lastEmittedWatermark2) {
 					lastEmittedWatermark2 = newMinWatermark;
-					operator.processWatermark2(new Watermark(lastEmittedWatermark2));
+					synchronized (lock) {
+						operator.processWatermark2(new Watermark(lastEmittedWatermark2));
+					}
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
deleted file mode 100644
index 7528eb3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A timer that triggers targets at a specific point in the future. This timer executes single-threaded,
- * which means that never more than one trigger will be executed at the same time.
- * <p>
- * This timer generally maintains order of trigger events. This means that for two triggers scheduled at
- * different times, the one scheduled for the later time will be executed after the one scheduled for the
- * earlier time.
- */
-public class TriggerTimer {
-	
-	/** The thread group that holds all trigger timer threads */
-	public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers");
-	
-	/** The executor service that */
-	private final ScheduledExecutorService scheduler;
-
-
-	/**
-	 * Creates a new trigger timer, where the timer thread has the default name "TriggerTimer Thread".
-	 */
-	public TriggerTimer() {
-		this("TriggerTimer Thread");
-	}
-
-	/**
-	 * Creates a new trigger timer, where the timer thread has the given name.
-	 * 
-	 * @param triggerName The name for the trigger thread.
-	 */
-	public TriggerTimer(String triggerName) {
-		this.scheduler = Executors.newSingleThreadScheduledExecutor(
-				new DispatcherThreadFactory(TRIGGER_THREADS_GROUP, triggerName));
-	}
-
-	/**
-	 * Schedules a new trigger event. The trigger event will occur roughly at the given timestamp.
-	 * If the timestamp is in the past (or now), the trigger will be queued for immediate execution. Note that other
-	 * triggers that are to be executed now will be executed before this trigger.
-	 * 
-	 * @param target The target to be triggered.
-	 * @param timestamp The timestamp when the trigger should occur, and the timestamp given
-	 *                  to the trigger-able target.
-	 */
-	public void scheduleTriggerAt(Triggerable target, long timestamp) {
-		long delay = Math.max(timestamp - System.currentTimeMillis(), 0);
-		
-		scheduler.schedule(
-				new TriggerTask(target, timestamp),
-				delay,
-				TimeUnit.MILLISECONDS);
-	}
-
-	/**
-	 * Shuts down the trigger timer, canceling all pending triggers and stopping the trigger thread.
-	 */
-	public void shutdown() {
-		scheduler.shutdownNow();
-	}
-
-	/**
-	 * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
-	 * shutdown method was never called.
-	 * <p>
-	 * This should not be relied upon! It will cause shutdown to happen much later than if manual
-	 * shutdown is attempted, and cause threads to linger for longer than needed.
-	 */
-	@Override
-	@SuppressWarnings("FinalizeDoesntCallSuperFinalize")
-	protected void finalize() {
-		shutdown();
-	}
-	
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Internal task that is invoked by the scheduled executor and triggers the target.
-	 */
-	private static final class TriggerTask implements Runnable {
-		
-		private final Triggerable target;
-		private final long timestamp;
-
-		TriggerTask(Triggerable target, long timestamp) {
-			this.target = target;
-			this.timestamp = timestamp;
-		}
-
-		@Override
-		public void run() {
-			target.trigger(timestamp);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
index 626f087..ac1a543 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
@@ -34,5 +34,5 @@ public interface Triggerable {
 	 * 
 	 * @param timestamp The timestamp for which the trigger event was scheduled.
 	 */
-	void trigger(long timestamp);
+	void trigger(long timestamp) throws Exception ;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
index f5f576d..2e926bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
@@ -24,12 +24,10 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.TriggerTimer;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -41,9 +39,6 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 	
 	private static final long MIN_SLIDE_TIME = 50;
 	
-	
-	private final SerializableObject lock = new SerializableObject();
-
 	// ----- fields for operator parametrization -----
 	
 	private final Function function;
@@ -60,14 +55,9 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 	
 	private transient TimestampedCollector<OUT> out;
 	
-	private transient TriggerTimer triggerTimer;
-	
 	private transient long nextEvaluationTime;
 	private transient long nextSlideTime;
 	
-	private transient volatile Throwable asyncError;
-	
-	
 	protected AbstractAlignedProcessingTimeWindowOperator(
 			Function function,
 			KeySelector<IN, KEY> keySelector,
@@ -123,47 +113,31 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 		nextEvaluationTime = now + windowSlide - (now % windowSlide);
 		nextSlideTime = now + paneSize - (now % paneSize);
 		
-		// start the trigger timer
-		triggerTimer = new TriggerTimer("Trigger for " + getRuntimeContext().getTaskName());
-		
-		// schedule the first trigger
-		triggerTimer.scheduleTriggerAt(this, Math.min(nextEvaluationTime, nextSlideTime));
+		getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
 	}
 
 	@Override
 	public void close() throws Exception {
-		// acquire the lock during shutdown, to prevent trigger calls at the same time
-		synchronized (lock) {
-			final long finalWindowTimestamp = nextEvaluationTime;
-			
-			// early stop the triggering thread, so it does not attempt to return any more data
-			stopTriggers();
+		final long finalWindowTimestamp = nextEvaluationTime;
 
-			// make sure we had no asynchronous error so far
-			checkErroneous();
-			
-			// emit the remaining data
-			computeWindow(finalWindowTimestamp);
-		}
+		// early stop the triggering thread, so it does not attempt to return any more data
+		stopTriggers();
+
+		// emit the remaining data
+		computeWindow(finalWindowTimestamp);
 	}
 
 	@Override
 	public void dispose() {
 		// acquire the lock during shutdown, to prevent trigger calls at the same time
-		synchronized (lock) {
-			// fail-safe stop of the triggering thread (in case of an error)
-			stopTriggers();
-			
-			// release the panes
-			panes.dispose();
-		}
+		// fail-safe stop of the triggering thread (in case of an error)
+		stopTriggers();
+
+		// release the panes
+		panes.dispose();
 	}
 	
 	private void stopTriggers() {
-		if (triggerTimer != null) {
-			triggerTimer.shutdown();
-		}
-
 		// reset the action timestamps. this makes sure any pending triggers will not evaluate
 		nextEvaluationTime = -1L;
 		nextSlideTime = -1L;
@@ -175,10 +149,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 	
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		synchronized (lock) {
-			checkErroneous();
-			panes.addElementToLatestPane(element.getValue());
-		}
+		panes.addElementToLatestPane(element.getValue());
 	}
 
 	@Override
@@ -187,42 +158,24 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 	}
 
 	@Override
-	public void trigger(long timestamp) {
-		synchronized (lock) {
-			// first we check if we actually trigger the window function
-			if (timestamp == nextEvaluationTime) {
-				// compute and output the results
-				try {
-					computeWindow(timestamp);
-				}
-				catch (Throwable t) {
-					this.asyncError = t;
-				}
+	public void trigger(long timestamp) throws Exception {
+		// first we check if we actually trigger the window function
+		if (timestamp == nextEvaluationTime) {
+			// compute and output the results
+			computeWindow(timestamp);
 
-				nextEvaluationTime += windowSlide;
-			}
-			
-			// check if we slide the panes by one. this may happen in addition to the
-			// window computation, or just by itself
-			if (timestamp == nextSlideTime) {
-				try {
-					panes.slidePanes(numPanesPerWindow);
-				}
-				catch (Throwable t) {
-					this.asyncError = t;
-				}
-				nextSlideTime += paneSize;
-			}
-			
-			long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-			triggerTimer.scheduleTriggerAt(this, nextTriggerTime);
+			nextEvaluationTime += windowSlide;
 		}
-	}
-	
-	private void checkErroneous() throws Exception {
-		if (asyncError != null) {
-			throw new Exception("Error while computing and producing window result", asyncError);
+
+		// check if we slide the panes by one. this may happen in addition to the
+		// window computation, or just by itself
+		if (timestamp == nextSlideTime) {
+			panes.slidePanes(numPanesPerWindow);
+			nextSlideTime += paneSize;
 		}
+
+		long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
+		getRuntimeContext().registerTimer(nextTriggerTime, this);
 	}
 	
 	private void computeWindow(long timestamp) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 8ef02b2..89eac92 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -30,7 +30,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 	
 	private volatile boolean running = true;
 
-	
 	@Override
 	public void init() throws Exception {
 		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
@@ -53,7 +52,11 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	protected void run() throws Exception {
-		while (running && inputProcessor.processInput(streamOperator));
+		while (running && inputProcessor.processInput(streamOperator, lock)) {
+			if (timerException != null) {
+				throw timerException;
+			}
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index e704e32..fc221f8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  * One important aspect of this is that the checkpointing and the emission of elements must never
  * occur at the same time. The execution must be serial. This is achieved by having the contract
  * with the StreamFunction that it must only modify its state or emit elements in
- * a synchronized block that locks on the checkpointLock Object. Also, the modification of the state
+ * a synchronized block that locks on the lock Object. Also, the modification of the state
  * and the emission of elements must happen in the same block of code that is protected by the
  * synchronized block.
  *
@@ -52,8 +52,7 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 	protected void run() throws Exception {
 		final Object checkpointLock = getCheckpointLock();
 		
-		final SourceOutput<StreamRecord<OUT>> output = 
-				new SourceOutput<StreamRecord<OUT>>(outputHandler.getOutput(), checkpointLock);
+		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<>(outputHandler.getOutput(), checkpointLock);
 		
 		streamOperator.run(checkpointLock, output);
 	}
@@ -65,11 +64,18 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 
 	// ------------------------------------------------------------------------
 	
-	// TODO:
-	// does this help with anything? The losk should be already held by the source function that
-	// emits. If that one does not hold the lock, then this does not help either.
-	
-	private static class SourceOutput<T> implements Output<T> {
+	/**
+	 * Special output for sources that ensures that sources synchronize on  the lock object before
+	 * emitting elements.
+	 *
+	 * <p>
+	 * This is required to ensure that no concurrent method calls on operators later in the chain
+	 * can occur. When operators register a timer the timer callback is synchronized
+	 * on the same lock object.
+	 *
+	 * @param <T> The type of elements emitted by the source.
+	 */
+	private class SourceOutput<T> implements Output<T> {
 		
 		private final Output<T> output;
 		private final Object lockObject;
@@ -89,6 +95,9 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 		@Override
 		public void collect(T record) {
 			synchronized (lockObject) {
+				if (timerException != null) {
+					throw timerException;
+				}
 				output.collect(record);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d357a4d..1b35350 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -21,6 +21,9 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.functors.NotNullPredicate;
@@ -38,6 +41,7 @@ import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
@@ -45,12 +49,15 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.state.WrapperStateHandle;
 
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * 
+ * Base Invokable for all {@code StreamTasks}. A {@code StreamTask} processes input and forwards
+ * elements and watermarks to a {@link StreamOperator}.
+ *
  * <pre>
  *     
  *  -- registerInputOutput()
@@ -68,6 +75,11 @@ import org.slf4j.LoggerFactory;
  *        +----> common cleanup
  *        +----> operator specific cleanup()
  * </pre>
+ *
+ * <p>
+ * {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
+ * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
+ * are called concurrently.
  * 
  * @param <OUT>
  * @param <O>
@@ -76,8 +88,11 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
 
-	
-	private final Object checkpointLock = new Object();
+	/**
+	 * All interaction with the {@code StreamOperator} must be synchronized on this lock object to ensure that
+	 * we don't have concurrent method calls.
+	 */
+	protected final Object lock = new Object();
 
 	private final EventListener<CheckpointBarrier> checkpointBarrierListener;
 	
@@ -88,7 +103,19 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	protected StreamConfig configuration;
 
 	protected ClassLoader userClassLoader;
-	
+
+	/** The thread group that holds all trigger timer threads */
+	public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
+
+	/** The executor service that */
+	private ScheduledExecutorService timerService;
+
+	/**
+	 * This field is used to forward an exception that is caught in the timer thread. Subclasses
+	 * must ensure that exceptions stored here get thrown on the actual execution Thread.
+	 */
+	protected volatile TimerException timerException = null;
+
 	protected OutputHandler<OUT> outputHandler;
 
 	protected O streamOperator;
@@ -136,7 +163,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap();
 		AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
 
-		outputHandler = new OutputHandler<OUT>(this, accumulatorMap, reporter);
+		outputHandler = new OutputHandler<>(this, accumulatorMap, reporter);
 
 		if (streamOperator != null) {
 			// IterationHead and IterationTail don't have an Operator...
@@ -148,7 +175,10 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		}
 
 		hasChainedOperators = outputHandler.getChainedOperators().size() != 1;
-		
+
+		this.timerService = Executors.newSingleThreadScheduledExecutor(
+				new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+
 		// operator specific initialization
 		init();
 		
@@ -175,7 +205,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 			// make sure no further checkpoint and notification actions happen.
 			// we make sure that no other thread is currently in the locked scope before
 			// we close the operators by trying to acquire the checkpoint scope lock
-			synchronized (checkpointLock) {}
+			synchronized (lock) {}
 			
 			// this is part of the main logic, so if this fails, the task is considered failed
 			closeAllOperators();
@@ -190,6 +220,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		}
 		finally {
 			isRunning = false;
+
+			timerService.shutdown();
 			
 			// release the output resources. this method should never fail.
 			if (outputHandler != null) {
@@ -259,6 +291,25 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		}
 	}
 
+	/**
+	 * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
+	 * shutdown method was never called.
+	 *
+	 * <p>
+	 * This should not be relied upon! It will cause shutdown to happen much later than if manual
+	 * shutdown is attempted, and cause threads to linger for longer than needed.
+	 */
+	@Override
+	@SuppressWarnings("FinalizeDoesntCallSuperFinalize")
+	protected void finalize() {
+		if (timerService != null) {
+			if (!timerService.isTerminated()) {
+				LOG.warn("Timer service was not shut down. Shutting down in finalize().");
+			}
+			timerService.shutdown();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Access to properties and utilities
 	// ------------------------------------------------------------------------
@@ -272,7 +323,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	}
 
 	public Object getCheckpointLock() {
-		return checkpointLock;
+		return lock;
 	}
 
 	// ------------------------------------------------------------------------
@@ -303,12 +354,11 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
 		
-		synchronized (checkpointLock) {
+		synchronized (lock) {
 			if (isRunning) {
 				try {
 					// We wrap the states of the chained operators in a list, marking non-stateful operators with null
-					List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates
-							= new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>();
+					List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<>();
 
 					// A wrapper handle is created for the List of statehandles
 					WrapperStateHandle stateHandle;
@@ -352,7 +402,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		synchronized (checkpointLock) {
+		synchronized (lock) {
 			if (isRunning) {
 				for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
 					if (chainedOperator instanceof StatefulStreamOperator) {
@@ -412,6 +462,18 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		JOBMANAGER, FILESYSTEM
 	}
 
+	/**
+	 * Registers a timer.
+	 */
+	public void registerTimer(final long timestamp, final Triggerable target) {
+		long delay = Math.max(timestamp - System.currentTimeMillis(), 0);
+
+		timerService.schedule(
+				new TriggerTask(this, lock, target, timestamp),
+				delay,
+				TimeUnit.MILLISECONDS);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -420,7 +482,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
 
 		return new StreamingRuntimeContext(getEnvironment(), getExecutionConfig(),
-				statePartitioner, getStateHandleProvider(), accumulatorMap);
+				statePartitioner, getStateHandleProvider(), accumulatorMap, this);
 	}
 	
 	@Override
@@ -446,4 +508,36 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 			}
 		}
 	}
+
+	/**
+	 * Internal task that is invoked by the timer service and triggers the target.
+	 */
+	private static final class TriggerTask implements Runnable {
+
+		private final Object lock;
+		private final Triggerable target;
+		private final long timestamp;
+		private final StreamTask<?, ?> task;
+
+		TriggerTask(StreamTask<?, ?> task, final Object lock, Triggerable target, long timestamp) {
+			this.task = task;
+			this.lock = lock;
+			this.target = target;
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public void run() {
+			synchronized (lock) {
+				try {
+					target.trigger(timestamp);
+				} catch (Throwable t) {
+					LOG.error("Caught exception while processing timer.", t);
+					if (task.timerException == null) {
+						task.timerException = new TimerException(t);
+					}
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index b82888e..0ac352b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
 import org.apache.flink.streaming.api.state.StreamOperatorState;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
@@ -52,16 +53,21 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	private final List<PartitionedStreamOperatorState<?, ?, ?>> partitionedStates;
 	private final KeySelector<?, ?> statePartitioner;
 	private final StateHandleProvider<Serializable> provider;
-	
-	
+
+	/**
+	 * We need access to the {@link StreamTask} to register timer callbacks.
+	 */
+	private final StreamTask<?, ?> streamTask;
+
 	@SuppressWarnings("unchecked")
 	public StreamingRuntimeContext(
 			Environment env,
 			ExecutionConfig executionConfig,
 			KeySelector<?, ?> statePartitioner,
 			StateHandleProvider<?> provider,
-			Map<String, Accumulator<?, ?>> accumulatorMap) {
-		
+			Map<String, Accumulator<?, ?>> accumulatorMap,
+			StreamTask<?, ?> streamTask) {
+
 		super(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
 				env.getUserClassLoader(), executionConfig,
 				env.getDistributedCacheEntries(), accumulatorMap);
@@ -71,6 +77,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 		this.states = new HashMap<>();
 		this.partitionedStates = new LinkedList<>();
 		this.provider = (StateHandleProvider<Serializable>) provider;
+		this.streamTask = streamTask;
 	}
 
 	/**
@@ -167,6 +174,18 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	}
 
 	/**
+	 * Register a timer callback. At the specified time the
+	 * {@code Triggerable } will be invoked. This call is guaranteed to not happen
+	 * concurrently with method calls on the operator.
+	 *
+	 * @param time The absolute time in milliseconds.
+	 * @param target The target to be triggered.
+	 */
+	public void registerTimer(long time, Triggerable target) {
+		streamTask.registerTimer(time, target);
+	}
+
+	/**
 	 * Sets the next input of the underlying operators, used to access
 	 * partitioned states.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
new file mode 100644
index 0000000..3e1c1e5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+/**
+ * {@code RuntimeException} for wrapping exceptions that are thrown in the timer callback of
+ * the timer service in {@link StreamTask}.
+ */
+public class TimerException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+	public TimerException(Throwable cause) {
+		super(cause);
+	}
+
+	@Override
+	public String toString() {
+		return "TimerException{" + getCause() + "}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 5d0497d..25f1a76 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -75,7 +75,11 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	@Override
 	protected void run() throws Exception {
-		while (running && inputProcessor.processInput(streamOperator));
+		while (running && inputProcessor.processInput(streamOperator, lock)) {
+			if (timerException != null) {
+				throw timerException;
+			}
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 4aec723..b8b4c13 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -171,7 +171,8 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 				new ExecutionConfig(),
 				partitioner,
 				new LocalStateHandleProvider<Serializable>(),
-				new HashMap<String, Accumulator<?, ?>>());
+				new HashMap<String, Accumulator<?, ?>>(),
+				null);
 
 		StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
new file mode 100644
index 0000000..2aed041
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+public class StreamTaskTimerTest {
+
+	@Test
+	public void testOpenCloseAndTimestamps() throws Exception {
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamMap<String, String> mapOperator = new StreamMap<>(null);
+		streamConfig.setStreamOperator(mapOperator);
+
+		testHarness.invoke();
+
+		// first one spawns thread
+		mapTask.registerTimer(System.currentTimeMillis(), new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {}
+		});
+
+		assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
+
+
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+
+		// thread needs to die in time
+		long deadline = System.currentTimeMillis() + 4000;
+		while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
+			Thread.sleep(10);
+		}
+
+		assertEquals("Trigger timer thread did not properly shut down",
+				0, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
+	}
+	
+	@Test
+	public void checkScheduledTimestampe() {
+		try {
+
+			final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+			final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+			StreamConfig streamConfig = testHarness.getStreamConfig();
+			StreamMap<String, String> mapOperator = new StreamMap<>(null);
+			streamConfig.setStreamOperator(mapOperator);
+
+			testHarness.invoke();
+
+			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+			final long t1 = System.currentTimeMillis();
+			final long t2 = System.currentTimeMillis() - 200;
+			final long t3 = System.currentTimeMillis() + 100;
+			final long t4 = System.currentTimeMillis() + 200;
+
+			mapTask.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0));
+			mapTask.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1));
+			mapTask.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2));
+			mapTask.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3));
+
+
+			testHarness.endInput();
+			testHarness.waitForTaskCompletion();
+
+			long deadline = System.currentTimeMillis() + 5000;
+			while (errorRef.get() == null &&
+					ValidatingTriggerable.numInSequence < 4 &&
+					System.currentTimeMillis() < deadline)
+			{
+				Thread.sleep(100);
+			}
+
+			// handle errors
+			if (errorRef.get() != null) {
+				errorRef.get().printStackTrace();
+				fail(errorRef.get().getMessage());
+			}
+
+			assertEquals(4, ValidatingTriggerable.numInSequence);
+
+
+			// wait until the trigger thread is shut down. otherwise, the other tests may become unstable
+			deadline = System.currentTimeMillis() + 4000;
+			while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
+				Thread.sleep(10);
+			}
+
+			assertEquals("Trigger timer thread did not properly shut down",
+					0, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private static class ValidatingTriggerable implements Triggerable {
+		
+		static int numInSequence;
+		
+		private final AtomicReference<Throwable> errorRef;
+		
+		private final long expectedTimestamp;
+		private final int expectedInSequence;
+
+		private ValidatingTriggerable(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) {
+			this.errorRef = errorRef;
+			this.expectedTimestamp = expectedTimestamp;
+			this.expectedInSequence = expectedInSequence;
+		}
+
+		@Override
+		public void trigger(long timestamp) {
+			try {
+				assertEquals(expectedTimestamp, timestamp);
+				assertEquals(expectedInSequence, numInSequence);
+				numInSequence++;
+			}
+			catch (Throwable t) {
+				errorRef.compareAndSet(null, t);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java
deleted file mode 100644
index 1349cb3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.*;
-
-public class TriggerTimerTest {
-	
-	@Test
-	public void testThreadGroupAndShutdown() {
-		try {
-			TriggerTimer timer = new TriggerTimer();
-			
-			// first one spawns thread
-			timer.scheduleTriggerAt(new Triggerable() {
-				@Override
-				public void trigger(long timestamp) {}
-			}, System.currentTimeMillis());
-			
-			assertEquals(1, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount());
-			
-			// thread needs to die in time
-			timer.shutdown();
-			
-			long deadline = System.currentTimeMillis() + 2000;
-			while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
-				Thread.sleep(10);
-			}
-
-			assertEquals("Trigger timer thread did not properly shut down",
-					0, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void checkScheduledTimestampe() {
-		try {
-			final TriggerTimer timer = new TriggerTimer();
-
-			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-			
-			final long t1 = System.currentTimeMillis();
-			final long t2 = System.currentTimeMillis() - 200;
-			final long t3 = System.currentTimeMillis() + 100;
-			final long t4 = System.currentTimeMillis() + 200;
-			
-			timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t1, 0), t1);
-			timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t2, 1), t2);
-			timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t3, 2), t3);
-			timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t4, 3), t4);
-			
-			long deadline = System.currentTimeMillis() + 5000;
-			while (errorRef.get() == null &&
-					ValidatingTriggerable.numInSequence < 4 &&
-					System.currentTimeMillis() < deadline)
-			{
-				Thread.sleep(100);
-			}
-			
-			// handle errors
-			if (errorRef.get() != null) {
-				errorRef.get().printStackTrace();
-				fail(errorRef.get().getMessage());
-			}
-			
-			assertEquals(4, ValidatingTriggerable.numInSequence);
-			
-			timer.shutdown();
-			
-			// wait until the trigger thread is shut down. otherwise, the other tests may become unstable
-			deadline = System.currentTimeMillis() + 2000;
-			while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
-				Thread.sleep(10);
-			}
-
-			assertEquals("Trigger timer thread did not properly shut down", 
-					0, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	
-	private static class ValidatingTriggerable implements Triggerable {
-		
-		static int numInSequence;
-		
-		private final AtomicReference<Throwable> errorRef;
-		
-		private final long expectedTimestamp;
-		private final int expectedInSequence;
-
-		private ValidatingTriggerable(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) {
-			this.errorRef = errorRef;
-			this.expectedTimestamp = expectedTimestamp;
-			this.expectedInSequence = expectedInSequence;
-		}
-
-		@Override
-		public void trigger(long timestamp) {
-			try {
-				assertEquals(expectedTimestamp, timestamp);
-				assertEquals(expectedInSequence, numInSequence);
-				numInSequence++;
-			}
-			catch (Throwable t) {
-				errorRef.compareAndSet(null, t);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 685939b..19801f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -22,17 +22,24 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.TriggerTimer;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 import org.apache.flink.util.Collector;
 import org.junit.After;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.*;
 import static org.junit.Assert.*;
@@ -71,7 +78,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	public void checkNoTriggerThreadsRunning() {
 		// make sure that all the threads we trigger are shut down
 		long deadline = System.currentTimeMillis() + 5000;
-		while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
+		while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
 			try {
 				Thread.sleep(10);
 			}
@@ -79,7 +86,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 
 		assertTrue("Not all trigger threads where properly shut down",
-				TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() == 0);
+				StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -281,12 +288,37 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testTumblingWindowSingleElements() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
 
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
+			final Object lock = new Object();
+
+			doAnswer(new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+					timerService.schedule(
+							new Callable<Object>() {
+								@Override
+								public Object call() throws Exception {
+									synchronized (lock) {
+										target.trigger(timestamp);
+									}
+									return null;
+								}
+							},
+							timestamp - System.currentTimeMillis(),
+							TimeUnit.MILLISECONDS);
+					return null;
+				}
+			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+
 			// tumbling window that triggers every 20 milliseconds
 			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
@@ -294,16 +326,22 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			op.setup(out, mockContext);
 			op.open(new Configuration());
 
-			op.processElement(new StreamRecord<Integer>(1));
-			op.processElement(new StreamRecord<Integer>(2));
+			synchronized (lock) {
+				op.processElement(new StreamRecord<Integer>(1));
+				op.processElement(new StreamRecord<Integer>(2));
+			}
 			out.waitForNElements(2, 60000);
 
-			op.processElement(new StreamRecord<Integer>(3));
-			op.processElement(new StreamRecord<Integer>(4));
-			op.processElement(new StreamRecord<Integer>(5));
+			synchronized (lock) {
+				op.processElement(new StreamRecord<Integer>(3));
+				op.processElement(new StreamRecord<Integer>(4));
+				op.processElement(new StreamRecord<Integer>(5));
+			}
 			out.waitForNElements(5, 60000);
 
-			op.processElement(new StreamRecord<Integer>(6));
+			synchronized (lock) {
+				op.processElement(new StreamRecord<Integer>(6));
+			}
 			out.waitForNElements(6, 60000);
 			
 			List<Integer> result = out.getElements();
@@ -318,26 +356,55 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
+		} finally {
+			timerService.shutdown();
 		}
 	}
 	
 	@Test
 	public void testSlidingWindowSingleElements() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
 
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
+			final Object lock = new Object();
+
+			doAnswer(new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+					timerService.schedule(
+							new Callable<Object>() {
+								@Override
+								public Object call() throws Exception {
+									synchronized (lock) {
+										target.trigger(timestamp);
+									}
+									return null;
+								}
+							},
+							timestamp - System.currentTimeMillis(),
+							TimeUnit.MILLISECONDS);
+					return null;
+				}
+			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+
 			// tumbling window that triggers every 20 milliseconds
 			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
 
 			op.setup(out, mockContext);
 			op.open(new Configuration());
-			
-			op.processElement(new StreamRecord<Integer>(1));
-			op.processElement(new StreamRecord<Integer>(2));
+
+			synchronized (lock) {
+				op.processElement(new StreamRecord<Integer>(1));
+				op.processElement(new StreamRecord<Integer>(2));
+			}
 
 			// each element should end up in the output three times
 			// wait until the elements have arrived 6 times in the output
@@ -355,6 +422,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
+		} finally {
+			timerService.shutdown();
 		}
 	}
 	
@@ -395,43 +464,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
-	public void testPropagateExceptionsFromTrigger() {
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			KeyedWindowFunction<Integer, Integer, Integer> failingFunction = new FailingFunction(100);
-			
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(failingFunction, identitySelector, 50, 50);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			try {
-				int num = 0;
-				while (num < Integer.MAX_VALUE) {
-					op.processElement(new StreamRecord<Integer>(num++));
-					Thread.sleep(1);
-				}
-				fail("This should really have failed with an exception quite a while ago...");
-			}
-			catch (Exception e) {
-				assertNotNull(e.getCause());
-				assertTrue(e.getCause().getMessage().contains("Artificial Test Exception"));
-			}
-			
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
 	public void testPropagateExceptionsFromClose() {
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 464df32..0ff974c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -22,22 +22,33 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.TriggerTimer;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 import org.junit.After;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -70,7 +81,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	public void checkNoTriggerThreadsRunning() {
 		// make sure that all the threads we trigger are shut down
 		long deadline = System.currentTimeMillis() + 5000;
-		while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
+		while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
 			try {
 				Thread.sleep(10);
 			}
@@ -78,7 +89,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		}
 
 		assertTrue("Not all trigger threads where properly shut down",
-				TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() == 0);
+				StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -222,13 +233,38 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
-	public void testTumblingWindowDuplicateElements() {
+	public void  testTumblingWindowDuplicateElements() {
+
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
 
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
+
+			final Object lock = new Object();
+			doAnswer(new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+					timerService.schedule(
+							new Callable<Object>() {
+								@Override
+								public Object call() throws Exception {
+									synchronized (lock) {
+										target.trigger(timestamp);
+									}
+									return null;
+								}
+							},
+							timestamp - System.currentTimeMillis(),
+							TimeUnit.MILLISECONDS);
+					return null;
+				}
+			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
 			
 			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
 					new AggregatingProcessingTimeWindowOperator<>(
@@ -245,8 +281,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			while (window <= numWindows) {
 				long nextTime = op.getNextEvaluationTime();
 				int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-				
-				op.processElement(new StreamRecord<Integer>(val));
+
+				synchronized (lock) {
+					op.processElement(new StreamRecord<Integer>(val));
+				}
 				
 				if (nextTime != previousNextTime) {
 					window++;
@@ -272,6 +310,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
+		} finally {
+			timerService.shutdown();
 		}
 	}
 
@@ -332,21 +372,47 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testSlidingWindowSingleElements() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
 
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
+			final Object lock = new Object();
+			doAnswer(new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+					timerService.schedule(
+							new Callable<Object>() {
+								@Override
+								public Object call() throws Exception {
+									synchronized (lock) {
+										target.trigger(timestamp);
+									}
+									return null;
+								}
+							},
+							timestamp - System.currentTimeMillis(),
+							TimeUnit.MILLISECONDS);
+					return null;
+				}
+			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
 					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
 
 			op.setup(out, mockContext);
 			op.open(new Configuration());
-			
-			op.processElement(new StreamRecord<Integer>(1));
-			op.processElement(new StreamRecord<Integer>(2));
+
+			synchronized (lock) {
+				op.processElement(new StreamRecord<Integer>(1));
+				op.processElement(new StreamRecord<Integer>(2));
+			}
 
 			// each element should end up in the output three times
 			// wait until the elements have arrived 6 times in the output
@@ -364,6 +430,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
+		} finally {
+			timerService.shutdown();
 		}
 	}
 	
@@ -403,51 +471,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
-	public void testPropagateExceptionsFromTrigger() {
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			ReduceFunction<Integer> failingFunction = new FailingFunction(100);
-
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(failingFunction, identitySelector, 200, 50);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			try {
-				long nextWindowTime = op.getNextEvaluationTime();
-				int val = 0;
-				for (int num = 0; num < Integer.MAX_VALUE; num++) {
-					op.processElement(new StreamRecord<Integer>(val++));
-					Thread.sleep(1);
-					
-					// when the window has advanced, reset the value, to generate the same values
-					// in the next pane again. This causes the aggregation on trigger to reduce values
-					if (op.getNextEvaluationTime() != nextWindowTime) {
-						nextWindowTime = op.getNextEvaluationTime();
-						val = 0;
-					}
-				}
-				fail("This should really have failed with an exception quite a while ago...");
-			}
-			catch (Exception e) {
-				assertNotNull(e.getCause());
-				assertTrue(e.getCause().getMessage().contains("Artificial Test Exception"));
-			}
-			
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
 	public void testPropagateExceptionsFromProcessElement() {
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>();


Mime
View raw message