flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [4/4] flink git commit: [FLINK-3464] Use Processing-Time Clock in Window Assigners/Triggers
Date Mon, 27 Jun 2016 15:50:37 GMT
[FLINK-3464] Use Processing-Time Clock in Window Assigners/Triggers

Introduces a custom TimeServiceProvider to the StreamTask.
This is responsible for defining and updating the current
processingtime for a task and handling all related action,
such as registering timers for actions to be executed in
the future.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b5a7890
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b5a7890
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b5a7890

Branch: refs/heads/master
Commit: 4b5a789000a903c84f89b668a4cbd2ba1397e758
Parents: cb2b76d
Author: kl0u <kkloudas@gmail.com>
Authored: Thu May 12 14:16:14 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jun 27 17:16:12 2016 +0200

----------------------------------------------------------------------
 .../kafka/internals/AbstractFetcher.java        |   4 +-
 .../kafka/testutils/MockRuntimeContext.java     |  44 ++--
 .../api/operators/AbstractStreamOperator.java   |   4 +
 .../api/operators/StreamingRuntimeContext.java  |  10 +-
 .../assigners/EventTimeSessionWindows.java      |   2 +-
 .../api/windowing/assigners/GlobalWindows.java  |   2 +-
 .../assigners/ProcessingTimeSessionWindows.java |   5 +-
 .../assigners/SlidingEventTimeWindows.java      |   2 +-
 .../assigners/SlidingProcessingTimeWindows.java |   4 +-
 .../assigners/TumblingEventTimeWindows.java     |   2 +-
 .../TumblingProcessingTimeWindows.java          |   4 +-
 .../api/windowing/assigners/WindowAssigner.java |   3 +-
 .../assigners/WindowAssignerContext.java        |  37 ++++
 .../triggers/ContinuousEventTimeTrigger.java    |   2 +
 .../ContinuousProcessingTimeTrigger.java        |   8 +-
 .../triggers/ProcessingTimeTrigger.java         |   2 +-
 .../api/windowing/triggers/Trigger.java         |   9 +-
 .../runtime/io/StreamInputProcessor.java        |   4 +-
 .../windowing/EvictingWindowOperator.java       |   6 +-
 .../operators/windowing/WindowOperator.java     |  29 ++-
 .../tasks/DefaultTimeServiceProvider.java       |  60 +++++
 .../streaming/runtime/tasks/StreamTask.java     | 113 +++++++---
 .../runtime/tasks/TestTimeServiceProvider.java  |  91 ++++++++
 .../runtime/tasks/TimeServiceProvider.java      |  41 ++++
 .../runtime/operators/StreamTaskTimerTest.java  |  57 ++++-
 .../operators/windowing/WindowOperatorTest.java | 222 ++++++++++++++++++-
 .../runtime/tasks/StreamTaskTestHarness.java    |   7 +
 .../util/OneInputStreamOperatorTestHarness.java |  96 ++++++--
 28 files changed, 762 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 8183575..f9d2e64 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -425,7 +425,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		//-------------------------------------------------
 		
 		public void start() {
-			triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
+			triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
 		}
 		
 		@Override
@@ -454,7 +454,7 @@ public abstract class AbstractFetcher<T, KPH> {
 			}
 			
 			// schedule the next watermark
-			triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
+			triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 1ac2ef5..f1bb157 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -38,15 +38,16 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("deprecation")
 public class MockRuntimeContext extends StreamingRuntimeContext {
@@ -57,16 +58,27 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	private final ExecutionConfig execConfig;
 	private final Object checkpointLock;
 
-	private ScheduledExecutorService timer;
-	
+	private final TimeServiceProvider timerService;
+
 	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
 		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
 	}
-	
+
+	public MockRuntimeContext(
+		int numberOfParallelSubtasks, int indexOfThisSubtask,
+		ExecutionConfig execConfig,
+		Object checkpointLock) {
+
+		this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, checkpointLock,
+			DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
+	}
+
 	public MockRuntimeContext(
-			int numberOfParallelSubtasks, int indexOfThisSubtask, 
+			int numberOfParallelSubtasks, int indexOfThisSubtask,
 			ExecutionConfig execConfig,
-			Object checkpointLock) {
+			Object checkpointLock,
+			TimeServiceProvider timerService) {
+
 		super(new MockStreamOperator(),
 				new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
 				Collections.<String, Accumulator<?, ?>>emptyMap());
@@ -75,6 +87,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 		this.indexOfThisSubtask = indexOfThisSubtask;
 		this.execConfig = execConfig;
 		this.checkpointLock = checkpointLock;
+		this.timerService = timerService;
 	}
 
 	@Override
@@ -186,16 +199,17 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException();
 	}
-	
+
+	public long getCurrentProcessingTime() {
+		Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
+		return timerService.getCurrentProcessingTime();
+	}
+
 	@Override
 	public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) {
-		if (timer == null) {
-			timer = Executors.newSingleThreadScheduledExecutor();
-		}
+		Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
 		
-		final long delay = Math.max(time - System.currentTimeMillis(), 0);
-
-		return timer.schedule(new Runnable() {
+		return timerService.registerTimer(time, new Runnable() {
 			@Override
 			public void run() {
 				synchronized (checkpointLock) {
@@ -207,7 +221,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 					}
 				}
 			}
-		}, delay, TimeUnit.MILLISECONDS);
+		});
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 3efc469..05fc158 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -260,6 +260,10 @@ public abstract class AbstractStreamOperator<OUT>
 		return container.registerTimer(time, target);
 	}
 
+	protected long getCurrentProcessingTime() {
+		return container.getCurrentProcessingTime();
+	}
+
 	/**
 	 * Creates a partitioned state handle, using the state backend configured for this task.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index a858b4c..6a09492 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -93,7 +93,15 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
 		return operator.registerTimer(time, target);
 	}
-	
+
+	/**
+	 * Returns the current processing time as defined by the task's
+	 * {@link org.apache.flink.streaming.runtime.tasks.TimeServiceProvider TimeServiceProvider}
+	 */
+	public long getCurrentProcessingTime() {
+		return operator.getCurrentProcessingTime();
+	}
+
 	// ------------------------------------------------------------------------
 	//  broadcast variables
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
index 64c14cd..e38f617 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -51,7 +51,7 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW
 	}
 
 	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 71101f6..1c6284a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -43,7 +43,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 	private GlobalWindows() {}
 
 	@Override
-	public Collection<GlobalWindow> assignWindows(Object element, long timestamp) {
+	public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		return Collections.singletonList(GlobalWindow.get());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
index 0e1682d..52d1c03 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -51,8 +51,9 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object,
 	}
 
 	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
+		long currentProcessingTime = context.getCurrentProcessingTime();
+		return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 83511df..8fd0d25 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -58,7 +58,7 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	}
 
 	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		if (timestamp > Long.MIN_VALUE) {
 			List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
 			long lastStart = timestamp - timestamp % slide;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index d2b0707..6a03640 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -55,8 +55,8 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 	}
 
 	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		timestamp = System.currentTimeMillis();
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
+		timestamp = context.getCurrentProcessingTime();
 		List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
 		long lastStart = timestamp - timestamp % slide;
 		for (long start = lastStart;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 70432a6..44464f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -54,7 +54,7 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	}
 
 	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		if (timestamp > Long.MIN_VALUE) {
 			// Long.MIN_VALUE is currently assigned when no timestamp is present
 			long start = timestamp - (timestamp % size);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 3ec55d0..ce36144 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -51,8 +51,8 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
 	}
 
 	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		final long now = System.currentTimeMillis();
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
+		final long now = context.getCurrentProcessingTime();
 		long start = now - (now % size);
 		return Collections.singletonList(new TimeWindow(start, start + size));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 0b49bce..c25d6d9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -50,8 +50,9 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
 	 *
 	 * @param element The element to which windows should be assigned.
 	 * @param timestamp The timestamp of the element.
+	 * @param context The {@link WindowAssignerContext} in which the assigner operates.
 	 */
-	public abstract Collection<W> assignWindows(T element, long timestamp);
+	public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
 
 	/**
 	 * Returns the default trigger associated with this {@code WindowAssigner}.

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
new file mode 100644
index 0000000..e3f51a2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * A context provided to the {@link WindowAssigner} that allows it to query the
+ * current processing time. This is provided to the assigner by its containing
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
+ * which, in turn, gets it from the containing
+ * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
+ */
+public abstract class WindowAssignerContext {
+
+	/**
+	 * Returns the current processing time, as returned by
+	 * the {@link StreamTask#getCurrentProcessingTime()}.
+	 */
+	public abstract long getCurrentProcessingTime();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 5cb0e4d..02613f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -89,6 +89,8 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 	@Override
 	public void clear(W window, TriggerContext ctx) throws Exception {
 		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+		long timestamp = fireTimestamp.get();
+		ctx.deleteEventTimeTimer(timestamp);
 		fireTimestamp.clear();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index c6e11b1..b224cf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -29,8 +29,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
- * A {@link Trigger} that continuously fires based on a given time interval. The time is the current
- * system time.
+ * A {@link Trigger} that continuously fires based on a given time interval as measured by
+ * the clock of the machine on which the job is running.
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
@@ -52,7 +52,7 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
 	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
 		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 
-		timestamp = System.currentTimeMillis();
+		timestamp = ctx.getCurrentProcessingTime();
 
 		if (fireTimestamp.get() == null) {
 			long start = timestamp - (timestamp % interval);
@@ -87,6 +87,8 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
 	@Override
 	public void clear(W window, TriggerContext ctx) throws Exception {
 		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+		long timestamp = fireTimestamp.get();
+		ctx.deleteProcessingTimeTimer(timestamp);
 		fireTimestamp.clear();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 06193cd..8ea6a43 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -33,7 +33,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
 
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
-		ctx.registerProcessingTimeTimer(window.getEnd());
+		ctx.registerProcessingTimeTimer(window.maxTimestamp());
 		return TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index c9b9ff1..4d6c60f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.io.Serializable;
 
@@ -127,6 +128,12 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 	public interface TriggerContext {
 
 		/**
+		 * Returns the current processing time, as returned by
+		 * the {@link StreamTask#getCurrentProcessingTime()}.
+		 */
+		long getCurrentProcessingTime();
+
+		/**
 		 * Returns the metric group for this {@link Trigger}. This is the same metric
 		 * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
 		 * function.
@@ -170,7 +177,7 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 		void deleteEventTimeTimer(long time);
 	
 		/**
-		 * Retrieves an {@link State} object that can be used to interact with
+		 * Retrieves a {@link State} object that can be used to interact with
 		 * fault-tolerant state that is scoped to the window and key of the current
 		 * trigger invocation.
 		 *

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 33a0407..657d973 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -63,7 +63,7 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
  */
 @Internal
 public class StreamInputProcessor<IN> {
-	
+
 	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
 
 	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
@@ -76,7 +76,7 @@ public class StreamInputProcessor<IN> {
 
 	private boolean isFinished;
 
-	
+
 
 	private final long[] watermarks;
 	private long lastEmittedWatermark;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index fa1c894..d82fc85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -83,10 +83,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	@Override
 	@SuppressWarnings("unchecked")
 	public void processElement(StreamRecord<IN> element) throws Exception {
-
 		Collection<W> elementWindows = windowAssigner.assignWindows(
-			element.getValue(),
-			element.getTimestamp());
+				element.getValue(),
+				element.getTimestamp(),
+				windowAssignerContext);
 
 		final K key = (K) getStateBackend().getCurrentKey();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index bad1a22..ad01a5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssignerContext;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
@@ -80,9 +81,9 @@ import static java.util.Objects.requireNonNull;
  *
  * <p>
  * When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
- * assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
- * is put into panes. A pane is the bucket of elements that have the same key and belong to the same
- * {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
+ * assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element
+ * is put into panes. A pane is the bucket of elements that have the same key and same
+ * {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the
  * {@code WindowAssigner}.
  *
  * <p>
@@ -160,6 +161,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	protected transient Context context = new Context(null, null);
 
+	protected transient WindowAssignerContext windowAssignerContext;
+
 	// ------------------------------------------------------------------------
 	// State that needs to be checkpointed
 	// ------------------------------------------------------------------------
@@ -245,6 +248,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		context = new Context(null, null);
 
+		windowAssignerContext = new WindowAssignerContext() {
+			@Override
+			public long getCurrentProcessingTime() {
+				return WindowOperator.this.getCurrentProcessingTime();
+			}
+		};
+
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindowsByKey = new HashMap<>();
 		}
@@ -261,6 +271,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		processingTimeTimers = null;
 		processingTimeTimersQueue = null;
 		context = null;
+		windowAssignerContext = null;
 		mergingWindowsByKey = null;
 	}
 
@@ -273,16 +284,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		processingTimeTimers = null;
 		processingTimeTimersQueue = null;
 		context = null;
+		windowAssignerContext = null;
 		mergingWindowsByKey = null;
 	}
 
 	@Override
 	@SuppressWarnings("unchecked")
 	public void processElement(StreamRecord<IN> element) throws Exception {
-
 		Collection<W> elementWindows = windowAssigner.assignWindows(
-			element.getValue(),
-			element.getTimestamp());
+			element.getValue(), element.getTimestamp(), windowAssignerContext);
 
 		final K key = (K) getStateBackend().getCurrentKey();
 
@@ -669,6 +679,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		@Override
+		public long getCurrentProcessingTime() {
+			return WindowOperator.this.getCurrentProcessingTime();
+		}
+
+		@Override
 		public void registerProcessingTimeTimer(long time) {
 			Timer<K, W> timer = new Timer<>(time, key, window);
 			// make sure we only put one timer per key into the queue
@@ -676,7 +691,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				processingTimeTimersQueue.add(timer);
 				//If this is the first timer added for this timestamp register a TriggerTask
 				if (processingTimeTimerTimestamps.add(time, 1) == 0) {
-					ScheduledFuture<?> scheduledFuture= getRuntimeContext().registerTimer(time, WindowOperator.this);
+					ScheduledFuture<?> scheduledFuture = WindowOperator.this.registerTimer(time, WindowOperator.this);
 					processingTimeTimerFutures.put(time, scheduledFuture);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
new file mode 100644
index 0000000..b803b82
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link TimeServiceProvider} which assigns as current processing time the result of calling
+ * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
+ */
+public class DefaultTimeServiceProvider extends TimeServiceProvider {
+
+	/** The executor service that schedules and calls the triggers of this task*/
+	private final ScheduledExecutorService timerService;
+
+	public static DefaultTimeServiceProvider create (ScheduledExecutorService executor) {
+		return new DefaultTimeServiceProvider(executor);
+	}
+
+	private DefaultTimeServiceProvider(ScheduledExecutorService threadPoolExecutor) {
+		this.timerService = threadPoolExecutor;
+	}
+
+	@Override
+	public long getCurrentProcessingTime() {
+		return System.currentTimeMillis();
+	}
+
+	@Override
+	public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) {
+		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
+		return timerService.schedule(target, delay, TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public void shutdownService() throws Exception {
+		if (!timerService.isTerminated()) {
+			StreamTask.LOG.info("Timer service is shutting down.");
+		}
+		timerService.shutdownNow();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 444245c..a5de312 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -56,13 +56,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Base class for all streaming tasks. A task is the unit of local processing that is deployed
  * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
  * the Task's operator chain. Operators that are chained together execute synchronously in the
- * same thread and hence on the same stream partition. A common case for these chaines
+ * same thread and hence on the same stream partition. A common case for these chains
  * are successive map/flatmap/filter tasks.
  * 
  * <p>The task chain contains one "head" operator and multiple chained operators. 
@@ -127,10 +126,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	/** The class loader used to load dynamic classes of a job */
 	private ClassLoader userClassLoader;
-	
-	/** The executor service that schedules and calls the triggers of this task*/
-	private ScheduledThreadPoolExecutor timerService;
-	
+
+	/**
+	 * The internal {@link TimeServiceProvider} used to define the current
+	 * processing time (default = {@code System.currentTimeMillis()}) and
+	 * register timers for tasks to be executed in the future.
+	 */
+	private TimeServiceProvider timerService;
+
 	/** The map of user-defined accumulators of this task */
 	private Map<String, Accumulator<?, ?>> accumulatorMap;
 	
@@ -172,7 +175,25 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 	//  Core work methods of the Stream Task
 	// ------------------------------------------------------------------------
-	
+
+	/**
+	 * Allows the user to specify his own {@link TimeServiceProvider TimerServiceProvider}.
+	 * By default a {@link DefaultTimeServiceProvider DefaultTimerService} is going to be provided.
+	 * Changing it can be useful for testing processing time functionality, such as
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}
+	 * and {@link org.apache.flink.streaming.api.windowing.triggers.Trigger Triggers}.
+	 * */
+	public void setTimeService(TimeServiceProvider timeProvider) {
+		if (timeProvider == null) {
+			throw new RuntimeException("The timeProvider cannot be set to null.");
+		}
+		timerService = timeProvider;
+	}
+
+	public long getCurrentProcessingTime() {
+		return timerService.getCurrentProcessingTime();
+	}
+
 	@Override
 	public final void invoke() throws Exception {
 
@@ -185,6 +206,19 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			configuration = new StreamConfig(getTaskConfiguration());
 			accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
 
+			// if the clock is not already set, then assign a default TimeServiceProvider
+			if (timerService == null) {
+
+				ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
+					new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+
+				// allow trigger tasks to be removed if all timers for
+				// that timestamp are removed by user
+				executor.setRemoveOnCancelPolicy(true);
+
+				timerService = DefaultTimeServiceProvider.create(executor);
+			}
+
 			headOperator = configuration.getStreamOperator(userClassLoader);
 			operatorChain = new OperatorChain<>(this, headOperator, 
 						getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -193,10 +227,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
 			}
 
-			timerService =new ScheduledThreadPoolExecutor(1, new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
-			// allow trigger tasks to be removed if all timers for that timestamp are removed by user
-			timerService.setRemoveOnCancelPolicy(true);
-
 			getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() {
 				@Override
 				public Long getValue() {
@@ -265,7 +295,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			// stop all timers and threads
 			if (timerService != null) {
 				try {
-					timerService.shutdownNow();
+					timerService.shutdownService();
 				}
 				catch (Throwable t) {
 					// catch and log the exception to not replace the original exception
@@ -333,7 +363,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	public final boolean isCanceled() {
 		return canceled;
 	}
-	
+
+	/**
+	 * Execute the operator-specific {@link StreamOperator#open()} method in each
+	 * of the operators in the chain of this {@link StreamTask}. </b> Opening happens
+	 * from <b>tail to head</b> operator in the chain, contrary to
+	 * {@link StreamOperator#close()} which happens <b>head to tail</b>
+	 * operator (see {@link #closeAllOperators()}.
+	 */
 	private void openAllOperators() throws Exception {
 		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
 			if (operator != null) {
@@ -342,6 +379,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		}
 	}
 
+	/**
+	 * Execute the operator-specific {@link StreamOperator#close()} method in each
+	 * of the operators in the chain of this {@link StreamTask}. </b> Closing happens
+	 * from <b>head to tail</b> operator in the chain, contrary to
+	 * {@link StreamOperator#open()} which happens <b>tail to head</b> operator
+	 * (see {@link #openAllOperators()}.
+	 */
 	private void closeAllOperators() throws Exception {
 		// We need to close them first to last, since upstream operators in the chain might emit
 		// elements in their close methods.
@@ -354,6 +398,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		}
 	}
 
+	/**
+	 * Execute the operator-specific {@link StreamOperator#dispose()} method in each
+	 * of the operators in the chain of this {@link StreamTask}. </b> Disposing happens
+	 * from <b>tail to head</b> operator in the chain.
+	 */
 	private void tryDisposeAllOperators() throws Exception {
 		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
 			if (operator != null) {
@@ -361,7 +410,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			}
 		}
 	}
-	
+
+	/**
+	 * Execute the operator-specific {@link StreamOperator#dispose()} method in each
+	 * of the operators in the chain of this {@link StreamTask}. </b> Disposing happens
+	 * from <b>tail to head</b> operator in the chain.
+	 *
+	 * The difference with the {@link #tryDisposeAllOperators()} is that in case of an
+	 * exception, this method catches it and logs the message.
+	 */
 	private void disposeAllOperators() {
 		if (operatorChain != null) {
 			for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
@@ -389,10 +446,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	protected void finalize() throws Throwable {
 		super.finalize();
 		if (timerService != null) {
-			if (!timerService.isTerminated()) {
-				LOG.warn("Timer service was not shut down. Shutting down in finalize().");
-			}
-			timerService.shutdownNow();
+			timerService.shutdownService();
 		}
 
 		for (Thread checkpointThread : asyncCheckpointThreads) {
@@ -418,8 +472,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 
 	/**
-	 * Gets the lock object on which all operations that involve data and state mutation have to lock. 
-	 
+	 * Gets the lock object on which all operations that involve data and state mutation have to lock.
 	 * @return The checkpoint lock object.
 	 */
 	public Object getCheckpointLock() {
@@ -503,10 +556,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		synchronized (lock) {
 			if (isRunning) {
 
-				// since both state checkpointing and downstream barrier emission occurs in this
-				// lock scope, they are an atomic operation regardless of the order in which they occur
-				// we immediately emit the checkpoint barriers, so the downstream operators can start
-				// their checkpoint work as soon as possible
+				// Since both state checkpointing and downstream barrier emission occurs in this
+				// lock scope, they are an atomic operation regardless of the order in which they occur.
+				// Given this, we immediately emit the checkpoint barriers, so the downstream operators
+				// can start their checkpoint work as soon as possible
 				operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
 				
 				// now draw the state snapshot
@@ -689,18 +742,16 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	 * Registers a timer.
 	 */
 	public ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target) {
-		long delay = Math.max(timestamp - System.currentTimeMillis(), 0);
-
-		return timerService.schedule(
-			new TriggerTask(this, lock, target, timestamp),
-			delay,
-			TimeUnit.MILLISECONDS);
+		if (timerService == null) {
+			throw new IllegalStateException("The timer service has not been initialized.");
+		}
+		return timerService.registerTimer(timestamp, new TriggerTask(this, lock, target, timestamp));
 	}
 
 	/**
 	 * Check whether an exception was thrown in a Thread other than the main Thread. (For example
 	 * in the processing-time trigger Thread). This will rethrow that exception in case on
-	 * occured.
+	 * occurred.
 	 *
 	 * <p>This must be called in the main loop of {@code StreamTask} subclasses to ensure
 	 * that we propagate failures.

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
new file mode 100644
index 0000000..2314deb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * This is a {@link TimeServiceProvider} used <b>strictly for testing</b> the
+ * processing time functionality.
+ * */
+public class TestTimeServiceProvider extends TimeServiceProvider {
+
+	private long currentTime = 0;
+
+	private Map<Long, List<Runnable>> registeredTasks = new HashMap<>();
+
+	public void setCurrentTime(long timestamp) {
+		this.currentTime = timestamp;
+
+		// decide which timers to fire and put them in a list
+		// we do not fire them here to be able to accommodate timers
+		// that register other timers. The latter would through an exception.
+
+		Iterator<Map.Entry<Long, List<Runnable>>> it = registeredTasks.entrySet().iterator();
+		List<Runnable> toRun = new ArrayList<>();
+		while (it.hasNext()) {
+			Map.Entry<Long, List<Runnable>> t = it.next();
+			if (t.getKey() <= this.currentTime) {
+				for (Runnable r: t.getValue()) {
+					toRun.add(r);
+				}
+				it.remove();
+			}
+		}
+
+		// now do the actual firing.
+		for (Runnable r: toRun) {
+			r.run();
+		}
+	}
+
+	@Override
+	public long getCurrentProcessingTime() {
+		return currentTime;
+	}
+
+	@Override
+	public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) {
+		List<Runnable> tasks = registeredTasks.get(timestamp);
+		if (tasks == null) {
+			tasks = new ArrayList<>();
+			registeredTasks.put(timestamp, tasks);
+		}
+		tasks.add(target);
+		return null;
+	}
+
+	public int getNoOfRegisteredTimers() {
+		int count = 0;
+		for (List<Runnable> tasks: registeredTasks.values()) {
+			count += tasks.size();
+		}
+		return count;
+	}
+
+	@Override
+	public void shutdownService() throws Exception {
+		this.registeredTasks.clear();
+		this.registeredTasks = null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
new file mode 100644
index 0000000..f3e4f78
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Defines the current processing time and handles all related actions,
+ * such as register timers for tasks to be executed in the future.
+ */
+public abstract class TimeServiceProvider {
+
+	/** Returns the current processing time. */
+	public abstract long getCurrentProcessingTime();
+
+	/** Registers a task to be executed when (processing) time is {@code timestamp}.
+	 * @param timestamp
+	 * 						when the task is to be executed (in processing time)
+	 * @param target
+	 * 						the task to be executed
+	 * @return the result to be returned.
+	 */
+	public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Runnable target);
+
+	/** Shuts down and clean up the timer service provider. */
+	public abstract void shutdownService() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 153b1fd..c9f204d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -48,6 +49,58 @@ import static org.junit.Assert.*;
 public class StreamTaskTimerTest {
 
 	@Test
+	public void testCustomTimeServiceProvider() throws Throwable {
+		TestTimeServiceProvider tp = new TestTimeServiceProvider();
+
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+		mapTask.setTimeService(tp);
+
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+
+		StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
+		streamConfig.setStreamOperator(mapOperator);
+
+		testHarness.invoke();
+
+		assertTrue(testHarness.getCurrentProcessingTime() == 0);
+
+		tp.setCurrentTime(11);
+		assertTrue(testHarness.getCurrentProcessingTime() == 11);
+
+		tp.setCurrentTime(15);
+		tp.setCurrentTime(16);
+		assertTrue(testHarness.getCurrentProcessingTime() == 16);
+		
+		// register 2 tasks
+		mapTask.registerTimer(30, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+
+			}
+		});
+
+		mapTask.registerTimer(40, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+
+			}
+		});
+
+		assertEquals(2, tp.getNoOfRegisteredTimers());
+
+		tp.setCurrentTime(35);
+		assertEquals(1, tp.getNoOfRegisteredTimers());
+
+		tp.setCurrentTime(40);
+		assertEquals(0, tp.getNoOfRegisteredTimers());
+
+		tp.shutdownService();
+	}
+
+	@Test
 	public void testOpenCloseAndTimestamps() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
 		
@@ -176,6 +229,8 @@ public class StreamTaskTimerTest {
 	
 	public static class DummyMapFunction<T> implements MapFunction<T, T> {
 		@Override
-		public T map(T value) { return value; }
+		public T map(T value) {
+			return value;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 9aaf683..4f3ff63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -38,13 +38,18 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssignerContext;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
@@ -55,6 +60,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
@@ -68,6 +74,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class WindowOperatorTest {
@@ -592,6 +600,8 @@ public class WindowOperatorTest {
 
 		WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
 				.keyBy(new KeySelector<String, String>() {
+					private static final long serialVersionUID = 1L;
+
 					@Override
 					public String getKey(String value) throws Exception {
 						return value;
@@ -922,6 +932,193 @@ public class WindowOperatorTest {
 	}
 
 	@Test
+	public void testProcessingTimeTumblingWindows() throws Throwable {
+		final int WINDOW_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+				ProcessingTimeTrigger.create(), 0);
+
+		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testTimeProvider.setCurrentTime(3);
+
+		// timestamp is ignored in processing time
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+
+		testTimeProvider.setCurrentTime(5000);
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+
+		testTimeProvider.setCurrentTime(7000);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeSlidingWindows() throws Throwable {
+		final int WINDOW_SIZE = 3;
+		final int WINDOW_SLIDE = 1;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				SlidingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+				ProcessingTimeTrigger.create(), 0);
+
+		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		// timestamp is ignored in processing time
+		testTimeProvider.setCurrentTime(3);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+
+		testTimeProvider.setCurrentTime(1000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+
+		testTimeProvider.setCurrentTime(2000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+
+		testTimeProvider.setCurrentTime(3000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+
+		testTimeProvider.setCurrentTime(7000);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 4999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeSessionWindows() throws Throwable {
+		final int WINDOW_GAP = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				ProcessingTimeSessionWindows.withGap(Time.of(WINDOW_GAP, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+				ProcessingTimeTrigger.create(), 0);
+
+		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		// timestamp is ignored in processing time
+		testTimeProvider.setCurrentTime(3);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1));//Long.MAX_VALUE));
+
+		testTimeProvider.setCurrentTime(1000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1002));//Long.MAX_VALUE));
+
+		testTimeProvider.setCurrentTime(5000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 5000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 5000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
+
+		testTimeProvider.setCurrentTime(10000);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 7999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 7999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+
+		assertEquals(expectedOutput.size(), testHarness.getOutput().size());
+		for (Object elem : testHarness.getOutput()) {
+			if (elem instanceof StreamRecord) {
+				StreamRecord<Tuple2<String, Integer>> el = (StreamRecord<Tuple2<String, Integer>>) elem;
+				assertTrue(expectedOutput.contains(el));
+			}
+		}
+		testHarness.close();
+	}
+
+	@Test
 	public void testLateness() throws Exception {
 		final int WINDOW_SIZE = 2;
 		final long LATENESS = 500;
@@ -995,16 +1192,16 @@ public class WindowOperatorTest {
 
 		TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
 
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+		final WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
 			new WindowOperator<>(
 					windowAssigner,
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-				EventTimeTrigger.create(),
-				LATENESS);
+					new TimeWindow.Serializer(),
+					new TupleKeySelector(),
+					BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+					stateDesc,
+					new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+					EventTimeTrigger.create(),
+					LATENESS);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new OneInputStreamOperatorTestHarness<>(operator);
@@ -1017,7 +1214,12 @@ public class WindowOperatorTest {
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
 		long timestamp = Long.MAX_VALUE - 1750;
-		Collection<TimeWindow> windows = windowAssigner.assignWindows(new Tuple2<>("key2", 1), timestamp);
+		Collection<TimeWindow> windows = windowAssigner.assignWindows(new Tuple2<>("key2", 1), timestamp, new WindowAssignerContext() {
+			@Override
+			public long getCurrentProcessingTime() {
+				return operator.windowAssignerContext.getCurrentProcessingTime();
+			}
+		});
 		TimeWindow window = Iterables.getOnlyElement(windows);
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), timestamp));
@@ -1883,7 +2085,7 @@ public class WindowOperatorTest {
 
 		@Override
 		@SuppressWarnings("unchecked")
-		public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext ctx) {
 			if (element instanceof Tuple2) {
 				Tuple2<String, Integer> t2 = (Tuple2<String, Integer>) element;
 				if (t2.f1 == 33) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 4639321..00e95b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -113,6 +113,13 @@ public class StreamTaskTestHarness<OUT> {
 		outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
 	}
 
+	public long getCurrentProcessingTime() {
+		if (!(task instanceof StreamTask)) {
+			System.currentTimeMillis();
+		}
+		return ((StreamTask) task).getCurrentProcessingTime();
+	}
+
 	/**
 	 * This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6e2e9f9..86137d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -69,6 +70,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	
 	final Object checkpointLock;
 
+	final TimeServiceProvider timeServiceProvider;
+
 	StreamTask<?, ?> mockTask;
 
 	/**
@@ -80,8 +83,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
 		this(operator, new ExecutionConfig());
 	}
-	
+
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig) {
+		this(operator, executionConfig, null);
+	}
+
+	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig,
+											TimeServiceProvider testTimeProvider) {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<Object>();
 		this.config = new StreamConfig(new Configuration());
@@ -90,6 +98,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
 		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
 		mockTask = mock(StreamTask.class);
+		timeServiceProvider = testTimeProvider;
+
 		when(mockTask.getName()).thenReturn("Mock Task");
 		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
 		when(mockTask.getConfiguration()).thenReturn(config);
@@ -116,29 +126,44 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 			public Void answer(InvocationOnMock invocation) throws Throwable {
 				final long execTime = (Long) invocation.getArguments()[0];
 				final Triggerable target = (Triggerable) invocation.getArguments()[1];
-				
-				Thread caller = new Thread() {
-					@Override
-					public void run() {
-						final long delay = execTime - System.currentTimeMillis();
-						if (delay > 0) {
-							try {
-								Thread.sleep(delay);
-							} catch (InterruptedException ignored) {}
-						}
-						
-						synchronized (checkpointLock) {
-							try {
-								target.trigger(execTime);
-							} catch (Exception ignored) {}
+
+				if (timeServiceProvider == null) {
+					Thread caller = new Thread() {
+						@Override
+						public void run() {
+							final long delay = execTime - mockTask.getCurrentProcessingTime();
+							if (delay > 0) {
+								try {
+									Thread.sleep(delay);
+								} catch (InterruptedException ignored) {
+								}
+							}
+
+							synchronized (checkpointLock) {
+								try {
+									target.trigger(execTime);
+								} catch (Exception ignored) {
+								}
+							}
 						}
-					}
-				};
-				caller.start();
-				
+					};
+					caller.start();
+				} else {
+					timeServiceProvider.registerTimer(
+						execTime, new TriggerTask(checkpointLock, target, execTime));
+				}
 				return null;
 			}
 		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+		doAnswer(new Answer<Long>() {
+			@Override
+			public Long answer(InvocationOnMock invocation) throws Throwable {
+				return timeServiceProvider == null ?
+					System.currentTimeMillis() :
+					timeServiceProvider.getCurrentProcessingTime();
+			}
+		}).when(mockTask).getCurrentProcessingTime();
 	}
 
 	public Object getCheckpointLock() {
@@ -201,6 +226,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	public void close() throws Exception {
 		operator.close();
 		operator.dispose();
+		if (timeServiceProvider != null) {
+			timeServiceProvider.shutdownService();
+		}
 		setupCalled = false;
 	}
 
@@ -243,4 +271,32 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 			// ignore
 		}
 	}
+
+	private static final class TriggerTask implements Runnable {
+
+		private final Object lock;
+		private final Triggerable target;
+		private final long timestamp;
+
+		TriggerTask(final Object lock, Triggerable target, long timestamp) {
+			this.lock = lock;
+			this.target = target;
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public void run() {
+			synchronized (lock) {
+				try {
+					target.trigger(timestamp);
+				} catch (Throwable t) {
+					try {
+						throw t;
+					} catch (Exception e) {
+						e.printStackTrace();
+					}
+				}
+			}
+		}
+	}
 }


Mime
View raw message