flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [08/11] flink git commit: [FLINK-3674] Add an interface for Time aware User Functions
Date Fri, 21 Oct 2016 17:14:23 GMT
[FLINK-3674] Add an interface for Time aware User Functions

This moves the event-time/processing-time trigger code from
WindowOperator behind a well defined interface that can be used by
operators (and user functions).

InternalTimerService is the new interface that has the same
functionality that WindowOperator used to have. TimerService is the user
facing interface that does not allow dealing with namespaces/payloads
and also does not allow deleting timers. There is a default
implementation in HeapInternalTimerService that can checkpoint timers to
a stream and also restore from a stream. Right now, this is managed in
AbstractStreamOperator and operators can ask for an
InternalTimerService.

This also adds tests for HeapInternalTimerService.

This adds two new user functions:
 - TimelyFlatMapFunction: an extension of FlatMapFunction that also
   allows querying time and setting timers
 - TimelyCoFlatMapFunction: the same, but for CoFlatMapFunction

There are two new StreamOperator implementations for these that use the
InternalTimerService interface.

This also adds tests for the two new operators.

This also adds the new interface KeyContext that is used for
setting/querying the current key context for state and timers. Timers
are always scoped to a key, for now.

Also, this moves the handling of watermarks for both one-input and
two-input operators to AbstractStreamOperators so that we have a central
ground-truth.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81b19e53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81b19e53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81b19e53

Branch: refs/heads/master
Commit: 81b19e5323edd384e00f77eaa4a5c543db7e2499
Parents: f305baa
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Sep 26 16:21:51 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Oct 21 19:03:05 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  17 +-
 .../flink/storm/wrappers/BoltWrapper.java       |   7 -
 .../operator/AbstractCEPPatternOperator.java    |   9 +-
 .../AbstractKeyedCEPPatternOperator.java        |   7 +-
 .../flink/streaming/api/SimpleTimerService.java |  55 ++
 .../apache/flink/streaming/api/TimeDomain.java  |  34 ++
 .../flink/streaming/api/TimerService.java       |  53 ++
 .../streaming/api/datastream/KeyedStream.java   |  41 +-
 .../functions/RichTimelyFlatMapFunction.java    |  40 ++
 .../api/functions/TimelyFlatMapFunction.java    |  78 +++
 .../co/RichTimelyCoFlatMapFunction.java         |  41 ++
 .../functions/co/TimelyCoFlatMapFunction.java   |  92 ++++
 .../query/AbstractQueryableStateOperator.java   |   6 -
 .../source/ContinuousFileReaderOperator.java    |   2 +
 .../api/operators/AbstractStreamOperator.java   | 177 +++++-
 .../operators/AbstractUdfStreamOperator.java    |   2 +
 .../api/operators/HeapInternalTimerService.java | 318 +++++++++++
 .../streaming/api/operators/InternalTimer.java  |  90 +++
 .../api/operators/InternalTimerService.java     |  60 ++
 .../streaming/api/operators/KeyContext.java     |  31 ++
 .../streaming/api/operators/StreamFilter.java   |   6 -
 .../streaming/api/operators/StreamFlatMap.java  |   8 +-
 .../api/operators/StreamGroupedFold.java        |   6 -
 .../api/operators/StreamGroupedReduce.java      |   7 -
 .../streaming/api/operators/StreamMap.java      |   6 -
 .../streaming/api/operators/StreamProject.java  |   6 -
 .../streaming/api/operators/StreamSink.java     |   6 -
 .../api/operators/StreamTimelyFlatMap.java      |  79 +++
 .../streaming/api/operators/Triggerable.java    |  40 ++
 .../api/operators/co/CoStreamFlatMap.java       |  27 -
 .../streaming/api/operators/co/CoStreamMap.java |  27 -
 .../api/operators/co/CoStreamTimelyFlatMap.java |  96 ++++
 .../operators/GenericWriteAheadSink.java        |   9 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   6 -
 .../windowing/AccumulatingKeyedTimePanes.java   |   4 +-
 .../windowing/AggregatingKeyedTimePanes.java    |   2 +-
 .../windowing/EvictingWindowOperator.java       | 150 +++--
 .../operators/windowing/WindowOperator.java     | 312 +++--------
 .../tasks/TestProcessingTimeService.java        |  62 ++-
 .../operators/HeapInternalTimerServiceTest.java | 509 +++++++++++++++++
 .../api/operators/TimelyFlatMapTest.java        | 410 ++++++++++++++
 .../api/operators/co/TimelyCoFlatMapTest.java   | 544 +++++++++++++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java   |   9 +-
 .../test/checkpointing/RescalingITCase.java     |   5 +
 .../test/checkpointing/SavepointITCase.java     |   1 +
 .../test/streaming/runtime/TimestampITCase.java |   9 +-
 46 files changed, 3017 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 4d1ab50..fdd1bf4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -39,7 +41,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
@@ -364,7 +366,7 @@ public class RocksDBAsyncSnapshotTest {
 
 	public static class AsyncCheckpointOperator
 		extends AbstractStreamOperator<String>
-		implements OneInputStreamOperator<String, String> {
+		implements OneInputStreamOperator<String, String>, StreamCheckpointedOperator {
 
 		@Override
 		public void open() throws Exception {
@@ -394,9 +396,16 @@ public class RocksDBAsyncSnapshotTest {
 		}
 
 		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			// not interested
+		public void snapshotState(
+				FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
+			// do nothing so that we don't block
 		}
+
+		@Override
+		public void restoreState(FSDataInputStream in) throws Exception {
+			// do nothing so that we don't block
+		}
+
 	}
 
 	public static class DummyMapFunction<T> implements MapFunction<T, T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index d59ff04..55a8e28 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -36,7 +36,6 @@ import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
@@ -318,10 +317,4 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 					MessageId.makeUnanchored()));
 		}
 	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		this.output.emitWatermark(mark);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 455e864..1deb192 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -65,7 +65,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 	}
 
 	@Override
-	public void open() {
+	public void open() throws Exception {
+		super.open();
 		if (priorityQueue == null) {
 			priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
 		}
@@ -93,6 +94,9 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
+		// we do our own watermark handling, no super call. we will never be able to use
+		// the timer service like this, however.
+
 		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
 			StreamRecord<IN> streamRecord = priorityQueue.poll();
 
@@ -104,6 +108,7 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 
 	@Override
 	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
+		super.snapshotState(out, checkpointId, timestamp);
 		final ObjectOutputStream oos = new ObjectOutputStream(out);
 
 		oos.writeObject(nfa);
@@ -118,6 +123,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 	@Override
 	@SuppressWarnings("unchecked")
 	public void restoreState(FSDataInputStream state) throws Exception {
+		super.restoreState(state);
+
 		final ObjectInputStream ois = new ObjectInputStream(state);
 
 		nfa = (NFA<IN>)ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index c3898c3..54baf6d 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -166,9 +166,12 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
+		// we do our own watermark handling, no super call. we will never be able to use
+		// the timer service like this, however.
+
 		// iterate over all keys to trigger the execution of the buffered elements
 		for (KEY key: keys) {
-			setKeyContext(key);
+			setCurrentKey(key);
 
 			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
 			NFA<IN> nfa = getNFA();
@@ -187,6 +190,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 
 	@Override
 	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
+		super.snapshotState(out, checkpointId, timestamp);
 
 		DataOutputView ov = new DataOutputViewStreamWrapper(out);
 		ov.writeInt(keys.size());
@@ -198,6 +202,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 
 	@Override
 	public void restoreState(FSDataInputStream state) throws Exception {
+		super.restoreState(state);
 
 		DataInputView inputView = new DataInputViewStreamWrapper(state);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/SimpleTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/SimpleTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/SimpleTimerService.java
new file mode 100644
index 0000000..43d2659
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/SimpleTimerService.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+
+/**
+ * Implementation of {@link TimerService} that uses a {@link InternalTimerService}.
+ */
+@Internal
+public class SimpleTimerService implements TimerService {
+
+	private final InternalTimerService<VoidNamespace> internalTimerService;
+
+	public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) {
+		this.internalTimerService = internalTimerService;
+	}
+
+	@Override
+	public long currentProcessingTime() {
+		return internalTimerService.currentProcessingTime();
+	}
+
+	@Override
+	public long currentWatermark() {
+		return internalTimerService.currentWatermark();
+	}
+
+	@Override
+	public void registerProcessingTimeTimer(long time) {
+		internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
+	}
+
+	@Override
+	public void registerEventTimeTimer(long time) {
+		internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeDomain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeDomain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeDomain.java
new file mode 100644
index 0000000..7cdfdc2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeDomain.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api;
+
+/**
+ * {@code TimeDomain} specifies whether a firing timer is based on event time or processing time.
+ */
+public enum TimeDomain {
+
+	/**
+	 * Time is based on the timestamp of events.
+	 */
+	EVENT_TIME,
+
+	/**
+	 * Time is based on the current processing-time of a machine where processing happens.
+	 */
+	PROCESSING_TIME
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimerService.java
new file mode 100644
index 0000000..ef8b631
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimerService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Interface for working with time and timers.
+ */
+@PublicEvolving
+public interface TimerService {
+
+	/** Returns the current processing time. */
+	long currentProcessingTime();
+
+	/** Returns the current event-time watermark. */
+	long currentWatermark();
+
+	/**
+	 * Registers a timer to be fired when processing time passes the given time.
+	 *
+	 * <p>Timers can internally be scoped to keys and/or windows. When you set a timer
+	 * in a keyed context, such as in an operation on
+	 * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
+	 * will also be active when you receive the timer notification.
+	 */
+	void registerProcessingTimeTimer(long time);
+
+	/**
+	 * Registers a timer to be fired when the event time watermark passes the given time.
+	 *
+	 * <p>Timers can internally be scoped to keys and/or windows. When you set a timer
+	 * in a keyed context, such as in an operation on
+	 * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
+	 * will also be active when you receive the timer notification.
+	 */
+	void registerEventTimeTimer(long time);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index af907e3..1bce6a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -20,8 +20,10 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -32,6 +34,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -41,6 +44,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -169,7 +173,42 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 		result.getTransformation().setStateKeyType(keyType);
 		return result;
 	}
-	
+
+	/**
+	 * Applies a FlatMap transformation on a {@link DataStream}. The
+	 * transformation calls a {@link FlatMapFunction} for each element of the
+	 * DataStream. Each FlatMapFunction call can return any number of elements
+	 * including none. The user can also extend {@link RichFlatMapFunction} to
+	 * gain access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param flatMapper
+	 *            The FlatMapFunction that is called for each element of the
+	 *            DataStream
+	 *
+	 * @param <R>
+	 *            output type
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R> flatMap(TimelyFlatMapFunction<T, R> flatMapper) {
+
+		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
+				flatMapper,
+				TimelyFlatMapFunction.class,
+				false,
+				true,
+				getType(),
+				Utils.getCallLocationName(),
+				true);
+
+		StreamTimelyFlatMap<KEY, T, R> operator =
+				new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
+
+		return transform("Flat Map", outType, operator);
+
+	}
+
+
 	// ------------------------------------------------------------------------
 	//  Windowing
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
new file mode 100644
index 0000000..0d86da9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Rich variant of the {@link TimelyFlatMapFunction}. As a
+ * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichTimelyFlatMapFunction<I, O>
+		extends AbstractRichFunction
+		implements TimelyFlatMapFunction<I, O> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
new file mode 100644
index 0000000..77fe35e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
+ * and arrays.
+ *
+ * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react
+ * to them firing.
+ *
+ * <pre>{@code
+ * DataStream<X> input = ...;
+ *
+ * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }</pre>
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
+
+	/**
+	 * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms
+	 * it into zero, one, or more elements.
+	 *
+	 * @param value The input value.
+	 * @param timerService A {@link TimerService} that allows setting timers and querying the
+	 *                        current time.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void flatMap(I value, TimerService timerService, Collector<O> out) throws Exception;
+
+	/**
+	 * Called when a timer set using {@link TimerService} fires.
+	 *
+	 * @param timestamp The timestamp of the firing timer.
+	 * @param timeDomain The {@link TimeDomain} of the firing timer.
+	 * @param timerService A {@link TimerService} that allows setting timers and querying the
+	 *                        current time.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector<O> out) throws Exception ;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
new file mode 100644
index 0000000..12fe181
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link TimelyCoFlatMapFunction}. As a {@link RichFunction}, it gives
+ * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
+ * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link RichFunction#close()}.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichTimelyCoFlatMapFunction<IN1, IN2, OUT>
+		extends AbstractRichFunction
+		implements TimelyCoFlatMapFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
new file mode 100644
index 0000000..87355c6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * A {@code TimelyCoFlatMapFunction} implements a flat-map transformation over two
+ * connected streams.
+ * 
+ * <p>The same instance of the transformation function is used to transform
+ * both of the connected streams. That way, the stream transformations can
+ * share state.
+ *
+ * <p>A {@code TimelyCoFlatMapFunction} can, in addition to the functionality of a normal
+ * {@link CoFlatMapFunction}, also set timers and react to them firing.
+ * 
+ * <p>An example for the use of connected streams would be to apply rules that change over time
+ * onto elements of a stream. One of the connected streams has the rules, the other stream the
+ * elements to apply the rules to. The operation on the connected stream maintains the 
+ * current set of rules in the state. It may receive either a rule update (from the first stream)
+ * and update the state, or a data element (from the second stream) and apply the rules in the
+ * state to the element. The result of applying the rules would be emitted.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@PublicEvolving
+public interface TimelyCoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	/**
+	 * This method is called for each element in the first of the connected streams.
+	 * 
+	 * @param value The stream element
+	 * @param timerService A {@link TimerService} that allows setting timers and querying the
+	 *                        current time.
+	 * @param out The collector to emit resulting elements to
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
+	void flatMap1(IN1 value, TimerService timerService, Collector<OUT> out) throws Exception;
+
+	/**
+	 * This method is called for each element in the second of the connected streams.
+	 * 
+	 * @param value The stream element
+	 * @param timerService A {@link TimerService} that allows setting timers and querying the
+	 *                        current time.
+	 * @param out The collector to emit resulting elements to
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
+	void flatMap2(IN2 value, TimerService timerService, Collector<OUT> out) throws Exception;
+
+	/**
+	 * Called when a timer set using {@link TimerService} fires.
+	 *
+	 * @param timestamp The timestamp of the firing timer.
+	 * @param timeDomain The {@link TimeDomain} of the firing timer.
+	 * @param timerService A {@link TimerService} that allows setting timers and querying the
+	 *                        current time.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector<OUT> out) throws Exception ;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
index 09c9b01..7522a61 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -76,9 +75,4 @@ abstract class AbstractQueryableStateOperator<S extends State, IN>
 		super.open();
 		state = getPartitionedState(stateDescriptor);
 	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		// Nothing to do
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index be22677..4cc5206 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -387,6 +387,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 	@Override
 	public void snapshotState(FSDataOutputStream os, long checkpointId, long timestamp) throws Exception {
+		super.snapshotState(os, checkpointId, timestamp);
 
 		final ObjectOutputStream oos = new ObjectOutputStream(os);
 
@@ -409,6 +410,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 	@Override
 	public void restoreState(FSDataInputStream is) throws Exception {
+		super.restoreState(is);
 
 		final ObjectInputStream ois = new ObjectInputStream(is);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index b789c95..82ce493 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -21,12 +21,17 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -82,7 +87,7 @@ import java.util.Map;
  */
 @PublicEvolving
 public abstract class AbstractStreamOperator<OUT>
-		implements StreamOperator<OUT>, java.io.Serializable {
+		implements StreamOperator<OUT>, java.io.Serializable, KeyContext, StreamCheckpointedOperator {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -99,7 +104,7 @@ public abstract class AbstractStreamOperator<OUT>
 	/** The task that contains this operator (and other operators in the same chain) */
 	private transient StreamTask<?, ?> container;
 	
-	private transient StreamConfig config;
+	protected transient StreamConfig config;
 
 	protected transient Output<StreamRecord<OUT>> output;
 
@@ -107,7 +112,6 @@ public abstract class AbstractStreamOperator<OUT>
 	private transient StreamingRuntimeContext runtimeContext;
 
 
-
 	// ---------------- key/value state ------------------
 
 	/** key selector used to get the key for the state. Non-null only is the operator uses key/value state */
@@ -131,6 +135,20 @@ public abstract class AbstractStreamOperator<OUT>
 
 	protected LatencyGauge latencyGauge;
 
+	// ---------------- timers ------------------
+
+	private transient Map<String, HeapInternalTimerService<?, ?>> timerServices;
+	private transient Map<String, HeapInternalTimerService.RestoredTimers<?, ?>> restoredServices;
+
+
+	// ---------------- two-input operator watermarks ------------------
+
+	// We keep track of watermarks from both inputs, the combined input is the minimum
+	// Once the minimum advances we emit a new watermark for downstream operators
+	private long combinedWatermark = Long.MIN_VALUE;
+	private long input1Watermark = Long.MIN_VALUE;
+	private long input2Watermark = Long.MIN_VALUE;
+
 	// ------------------------------------------------------------------------
 	//  Life Cycle
 	// ------------------------------------------------------------------------
@@ -230,7 +248,9 @@ public abstract class AbstractStreamOperator<OUT>
 	 */
 	@Override
 	public void open() throws Exception {
-
+		if (timerServices == null) {
+			timerServices = new HashMap<>();
+		}
 	}
 
 	private void initKeyedState() {
@@ -449,12 +469,12 @@ public abstract class AbstractStreamOperator<OUT>
 	private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
 		if (selector != null) {
 			Object key = selector.getKey(record.getValue());
-			setKeyContext(key);
+			setCurrentKey(key);
 		}
 	}
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void setKeyContext(Object key) {
+	public void setCurrentKey(Object key) {
 		if (keyedStateBackend != null) {
 			try {
 				// need to work around type restrictions
@@ -468,6 +488,15 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 	}
 
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public Object getCurrentKey() {
+		if (keyedStateBackend != null) {
+			return keyedStateBackend.getCurrentKey();
+		} else {
+			throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream.");
+		}
+	}
+
 	public KeyedStateStore getKeyedStateStore() {
 		return keyedStateStore;
 	}
@@ -666,4 +695,140 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Watermark handling
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns a {@link InternalTimerService} that can be used to query current processing time
+	 * and event time and to set timers. An operator can have several timer services, where
+	 * each has its own namespace serializer. Timer services are differentiated by the string
+	 * key that is given when requesting them, if you call this method with the same key
+	 * multiple times you will get the same timer service instance in subsequent requests.
+	 *
+	 * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
+	 * When a timer fires, this key will also be set as the currently active key.
+	 *
+	 * <p>Each timer has attached metadata, the namespace. Different timer services
+	 * can have a different namespace type. If you don't need namespace differentiation you
+	 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
+	 *
+	 * @param name The name of the requested timer service. If no service exists under the given
+	 *             name a new one will be created and returned.
+	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
+	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
+	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
+	 *
+	 * @param <K> The type of the timer keys.
+	 * @param <N> The type of the timer namespace.
+	 */
+	public <K, N> InternalTimerService<N> getInternalTimerService(
+			String name,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			Triggerable<K, N> triggerable) {
+
+		@SuppressWarnings("unchecked")
+		HeapInternalTimerService<K, N> service = (HeapInternalTimerService<K, N>) timerServices.get(name);
+
+		if (service == null) {
+			if (restoredServices != null && restoredServices.containsKey(name)) {
+				@SuppressWarnings("unchecked")
+				HeapInternalTimerService.RestoredTimers<K, N> restoredService =
+						(HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
+
+				service = new HeapInternalTimerService<>(
+						keySerializer,
+						namespaceSerializer,
+						triggerable,
+						this,
+						getRuntimeContext().getProcessingTimeService(),
+						restoredService);
+
+			} else {
+				service = new HeapInternalTimerService<>(
+						keySerializer,
+						namespaceSerializer,
+						triggerable,
+						this,
+						getRuntimeContext().getProcessingTimeService());
+			}
+			timerServices.put(name, service);
+		}
+
+		return service;
+	}
+
+	public void processWatermark(Watermark mark) throws Exception {
+		for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
+			service.advanceWatermark(mark.getTimestamp());
+		}
+		output.emitWatermark(mark);
+	}
+
+	public void processWatermark1(Watermark mark) throws Exception {
+		input1Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark) {
+			combinedWatermark = newMin;
+			processWatermark(new Watermark(combinedWatermark));
+		}
+	}
+
+	public void processWatermark2(Watermark mark) throws Exception {
+		input2Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark) {
+			combinedWatermark = newMin;
+			processWatermark(new Watermark(combinedWatermark));
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
+		DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper(out);
+
+		dataOutputView.writeInt(timerServices.size());
+
+		for (Map.Entry<String, HeapInternalTimerService<?, ?>> service : timerServices.entrySet()) {
+			dataOutputView.writeUTF(service.getKey());
+			service.getValue().snapshotTimers(dataOutputView);
+		}
+
+	}
+
+	@Override
+	public void restoreState(FSDataInputStream in) throws Exception {
+		DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(in);
+
+		restoredServices = new HashMap<>();
+
+		int numServices = dataInputView.readInt();
+
+		for (int i = 0; i < numServices; i++) {
+			String name = dataInputView.readUTF();
+			HeapInternalTimerService.RestoredTimers restoredService =
+					new HeapInternalTimerService.RestoredTimers(in, getUserCodeClassloader());
+			restoredServices.put(name, restoredService);
+		}
+	}
+
+	@VisibleForTesting
+	public int numProcessingTimeTimers() {
+		int count = 0;
+		for (HeapInternalTimerService<?, ?> timerService : timerServices.values()) {
+			count += timerService.numProcessingTimeTimers();
+		}
+		return count;
+	}
+
+	@VisibleForTesting
+	public int numEventTimeTimers() {
+		int count = 0;
+		for (HeapInternalTimerService<?, ?> timerService : timerServices.values()) {
+			count += timerService.numEventTimeTimers();
+		}
+		return count;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 5e1a252..67d204a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -176,6 +176,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 	
 	@Override
 	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
+		super.snapshotState(out, checkpointId, timestamp);
 
 
 		if (userFunction instanceof Checkpointed) {
@@ -199,6 +200,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 
 	@Override
 	public void restoreState(FSDataInputStream in) throws Exception {
+		super.restoreState(in);
 
 		if (userFunction instanceof CheckpointedRestoring) {
 			@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
new file mode 100644
index 0000000..c77b634
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, Triggerable {
+
+	private final TypeSerializer<K> keySerializer;
+
+	private final TypeSerializer<N> namespaceSerializer;
+
+	private final ProcessingTimeService processingTimeService;
+
+	private long currentWatermark = Long.MIN_VALUE;
+
+	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
+
+	private final KeyContext keyContext;
+
+	/**
+	 * Processing time timers that are currently in-flight.
+	 */
+	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
+	private final Set<InternalTimer<K, N>> processingTimeTimers;
+
+	protected ScheduledFuture<?> nextTimer = null;
+
+	/**
+	 * Currently waiting watermark callbacks.
+	 */
+	private final Set<InternalTimer<K, N>> eventTimeTimers;
+	private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;
+
+	public HeapInternalTimerService(
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
+			KeyContext keyContext,
+			ProcessingTimeService processingTimeService) {
+		this.keySerializer = checkNotNull(keySerializer);
+		this.namespaceSerializer = checkNotNull(namespaceSerializer);
+		this.triggerTarget = checkNotNull(triggerTarget);
+		this.keyContext = keyContext;
+		this.processingTimeService = checkNotNull(processingTimeService);
+
+		eventTimeTimers = new HashSet<>();
+		eventTimeTimersQueue = new PriorityQueue<>(100);
+
+		processingTimeTimers = new HashSet<>();
+		processingTimeTimersQueue = new PriorityQueue<>(100);
+	}
+
+	public HeapInternalTimerService(
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
+			KeyContext keyContext,
+			ProcessingTimeService processingTimeService,
+			RestoredTimers<K, N> restoredTimers) {
+
+		this.keySerializer = checkNotNull(keySerializer);
+		this.namespaceSerializer = checkNotNull(namespaceSerializer);
+		this.triggerTarget = checkNotNull(triggerTarget);
+		this.keyContext = keyContext;
+		this.processingTimeService = checkNotNull(processingTimeService);
+
+		eventTimeTimers = restoredTimers.watermarkTimers;
+		eventTimeTimersQueue = restoredTimers.watermarkTimersQueue;
+
+		processingTimeTimers = restoredTimers.processingTimeTimers;
+		processingTimeTimersQueue = restoredTimers.processingTimeTimersQueue;
+
+		// re-register the restored timers (if any)
+		if (processingTimeTimersQueue.size() > 0) {
+			nextTimer =
+					processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(), this);
+		}
+	}
+
+
+	@Override
+	public long currentProcessingTime() {
+		return processingTimeService.getCurrentProcessingTime();
+	}
+
+	@Override
+	public long currentWatermark() {
+		return currentWatermark;
+	}
+
+	@Override
+	public void registerProcessingTimeTimer(N namespace, long time) {
+		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
+
+		// make sure we only put one timer per key into the queue
+		if (processingTimeTimers.add(timer)) {
+
+			InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
+			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
+
+			processingTimeTimersQueue.add(timer);
+
+			// check if we need to re-schedule our timer to earlier
+			if (time < nextTriggerTime) {
+				if (nextTimer != null) {
+					nextTimer.cancel(false);
+				}
+				nextTimer = processingTimeService.registerTimer(time, this);
+			}
+		}
+	}
+
+	@Override
+	public void registerEventTimeTimer(N namespace, long time) {
+		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
+		if (eventTimeTimers.add(timer)) {
+			eventTimeTimersQueue.add(timer);
+		}
+	}
+
+	@Override
+	public void deleteProcessingTimeTimer(N namespace, long time) {
+		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
+
+		if (processingTimeTimers.remove(timer)) {
+			processingTimeTimersQueue.remove(timer);
+		}
+	}
+
+	@Override
+	public void deleteEventTimeTimer(N namespace, long time) {
+		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
+		if (eventTimeTimers.remove(timer)) {
+			eventTimeTimersQueue.remove(timer);
+		}
+	}
+
+	@Override
+	public void trigger(long time) throws Exception {
+		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
+		// inside the callback.
+		nextTimer = null;
+
+		InternalTimer<K, N> timer;
+
+		while ((timer  = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
+
+			processingTimeTimers.remove(timer);
+			processingTimeTimersQueue.remove();
+
+			keyContext.setCurrentKey(timer.getKey());
+			triggerTarget.onProcessingTime(timer);
+		}
+
+		if (timer != null) {
+			if (nextTimer == null) {
+				nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
+			}
+		}
+	}
+
+	public void advanceWatermark(long time) throws Exception {
+		currentWatermark = time;
+
+		InternalTimer<K, N> timer;
+
+		while ((timer  = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
+
+			eventTimeTimers.remove(timer);
+			eventTimeTimersQueue.remove();
+
+			keyContext.setCurrentKey(timer.getKey());
+			triggerTarget.onEventTime(timer);
+
+			timer = eventTimeTimersQueue.peek();
+		}
+	}
+
+	public void snapshotTimers(OutputStream outStream) throws IOException {
+		InstantiationUtil.serializeObject(outStream, keySerializer);
+		InstantiationUtil.serializeObject(outStream, namespaceSerializer);
+
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(outStream);
+
+		out.writeInt(eventTimeTimers.size());
+		for (InternalTimer<K, N> timer : eventTimeTimers) {
+			keySerializer.serialize(timer.getKey(), out);
+			namespaceSerializer.serialize(timer.getNamespace(), out);
+			out.writeLong(timer.getTimestamp());
+		}
+
+		out.writeInt(processingTimeTimers.size());
+		for (InternalTimer<K, N> timer : processingTimeTimers) {
+			keySerializer.serialize(timer.getKey(), out);
+			namespaceSerializer.serialize(timer.getNamespace(), out);
+			out.writeLong(timer.getTimestamp());
+		}
+	}
+
+	public int numProcessingTimeTimers() {
+		return processingTimeTimers.size();
+	}
+
+	public int numEventTimeTimers() {
+		return eventTimeTimers.size();
+	}
+
+	public int numProcessingTimeTimers(N namespace) {
+		int count = 0;
+		for (InternalTimer<K, N> timer : processingTimeTimers) {
+			if (timer.getNamespace().equals(namespace)) {
+				count++;
+			}
+		}
+
+		return count;
+	}
+
+	public int numEventTimeTimers(N namespace) {
+		int count = 0;
+		for (InternalTimer<K, N> timer : eventTimeTimers) {
+			if (timer.getNamespace().equals(namespace)) {
+				count++;
+			}
+		}
+
+		return count;
+	}
+
+	public static class RestoredTimers<K, N> {
+
+		private final TypeSerializer<K> keySerializer;
+
+		private final TypeSerializer<N> namespaceSerializer;
+
+		/**
+		 * Processing time timers that are currently in-flight.
+		 */
+		private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
+		private final Set<InternalTimer<K, N>> processingTimeTimers;
+
+		/**
+		 * Currently waiting watermark callbacks.
+		 */
+		private final Set<InternalTimer<K, N>> watermarkTimers;
+		private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
+
+		public RestoredTimers(InputStream inputStream, ClassLoader userCodeClassLoader) throws Exception {
+
+			watermarkTimers = new HashSet<>();
+			watermarkTimersQueue = new PriorityQueue<>(100);
+
+			processingTimeTimers = new HashSet<>();
+			processingTimeTimersQueue = new PriorityQueue<>(100);
+
+			keySerializer = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
+			namespaceSerializer = InstantiationUtil.deserializeObject(
+					inputStream,
+					userCodeClassLoader);
+
+			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inputStream);
+
+			int numWatermarkTimers = inView.readInt();
+			for (int i = 0; i < numWatermarkTimers; i++) {
+				K key = keySerializer.deserialize(inView);
+				N namespace = namespaceSerializer.deserialize(inView);
+				long timestamp = inView.readLong();
+				InternalTimer<K, N> timer = new InternalTimer<>(timestamp, key, namespace);
+				watermarkTimers.add(timer);
+				watermarkTimersQueue.add(timer);
+			}
+
+			int numProcessingTimeTimers = inView.readInt();
+			for (int i = 0; i < numProcessingTimeTimers; i++) {
+				K key = keySerializer.deserialize(inView);
+				N namespace = namespaceSerializer.deserialize(inView);
+				long timestamp = inView.readLong();
+				InternalTimer<K, N> timer = new InternalTimer<>(timestamp, key, namespace);
+				processingTimeTimersQueue.add(timer);
+				processingTimeTimers.add(timer);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
new file mode 100644
index 0000000..c74ac2e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Internal class for keeping track of in-flight timers.
+ *
+ * @param <K> Type of the keys to which timers are scoped.
+ * @param <N> Type of the namespace to which timers are scoped.
+ */
+@Internal
+public class InternalTimer<K, N> implements Comparable<InternalTimer<K, N>> {
+	private final long timestamp;
+	private final K key;
+	private final N namespace;
+
+	public InternalTimer(long timestamp, K key, N namespace) {
+		this.timestamp = timestamp;
+		this.key = key;
+		this.namespace = namespace;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public K getKey() {
+		return key;
+	}
+
+	public N getNamespace() {
+		return namespace;
+	}
+
+	@Override
+	public int compareTo(InternalTimer<K, N> o) {
+		return Long.compare(this.timestamp, o.timestamp);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()){
+			return false;
+		}
+
+		InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
+
+		return timestamp == timer.timestamp
+				&& key.equals(timer.key)
+				&& namespace.equals(timer.namespace);
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (int) (timestamp ^ (timestamp >>> 32));
+		result = 31 * result + key.hashCode();
+		result = 31 * result + namespace.hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "Timer{" +
+				"timestamp=" + timestamp +
+				", key=" + key +
+				", namespace=" + namespace +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
new file mode 100644
index 0000000..805f9d4
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for working with time and timers.
+ *
+ * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
+ * that allows to specify a key and a namespace to which timers should be scoped.
+ *
+ * @param <N> Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface InternalTimerService<N> {
+
+	/** Returns the current processing time. */
+	long currentProcessingTime();
+
+	/** Returns the current event-time watermark. */
+	long currentWatermark();
+
+	/**
+	 * Registers a timer to be fired when processing time passes the given time. The namespace
+	 * you pass here will be provided when the timer fires.
+	 */
+	void registerProcessingTimeTimer(N namespace, long time);
+
+	/**
+	 * Deletes the timer for the given key and namespace.
+	 */
+	void deleteProcessingTimeTimer(N namespace, long time);
+
+	/**
+	 * Registers a timer to be fired when processing time passes the given time. The namespace
+	 * you pass here will be provided when the timer fires.
+	 */
+	void registerEventTimeTimer(N namespace, long time);
+
+	/**
+	 * Deletes the timer for the given key and namespace.
+	 */
+	void deleteEventTimeTimer(N namespace, long time);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java
new file mode 100644
index 0000000..e0fd493
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+/**
+ * Inteface for setting and querying the current key of keyed operations.
+ *
+ * <p>This is mainly used by the timer system to query the key when creating timers
+ * and to set the correct key context when firing a timer.
+ */
+public interface KeyContext {
+
+	void setCurrentKey(Object key);
+
+	Object getCurrentKey();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
index de1f8d3..2df95ca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -38,9 +37,4 @@ public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFuncti
 			output.collect(element);
 		}
 	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index ec7b713..c3ad260 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -39,7 +38,7 @@ public class StreamFlatMap<IN, OUT>
 	@Override
 	public void open() throws Exception {
 		super.open();
-		collector = new TimestampedCollector<OUT>(output);
+		collector = new TimestampedCollector<>(output);
 	}
 
 	@Override
@@ -47,9 +46,4 @@ public class StreamFlatMap<IN, OUT>
 		collector.setTimestamp(element);
 		userFunction.flatMap(element.getValue(), collector);
 	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 7bd7380..86fd8e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -92,11 +91,6 @@ public class StreamGroupedFold<IN, OUT, KEY>
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-
-	@Override
 	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
 		outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 229c254..48b9c2d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -64,10 +63,4 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 			output.collect(element.replace(value));
 		}
 	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
index a505001..6755bc0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -38,9 +37,4 @@ public class StreamMap<IN, OUT>
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		output.collect(element.replace(userFunction.map(element.getValue())));
 	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
index 9c2242f..ef51d8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -58,9 +57,4 @@ public class StreamProject<IN, OUT extends Tuple>
 		super.open();
 		outTuple = outSerializer.createInstance();
 	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index bd0f574..e238566 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -40,11 +39,6 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		// ignore it for now, we are a sink, after all
-	}
-
-	@Override
 	protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
 		// all operators are tracking latencies
 		this.latencyGauge.reportLatency(maker, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
new file mode 100644
index 0000000..962f264
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+@Internal
+public class StreamTimelyFlatMap<K, IN, OUT>
+		extends AbstractUdfStreamOperator<OUT, TimelyFlatMapFunction<IN, OUT>>
+		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<K> keySerializer;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient TimerService timerService;
+
+	public StreamTimelyFlatMap(TypeSerializer<K> keySerializer, TimelyFlatMapFunction<IN, OUT> flatMapper) {
+		super(flatMapper);
+
+		this.keySerializer = keySerializer;
+
+		chainingStrategy = ChainingStrategy.ALWAYS;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+
+		this.timerService = new SimpleTimerService(internalTimerService);
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		userFunction.onTimer(timer.getTimestamp(), TimeDomain.EVENT_TIME, timerService, collector);
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		userFunction.onTimer(timer.getTimestamp(), TimeDomain.PROCESSING_TIME, timerService, collector);
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		collector.setTimestamp(element);
+		userFunction.flatMap(element.getValue(), timerService, collector);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
new file mode 100644
index 0000000..36e9ad1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for things that can be called by {@link InternalTimerService}.
+ *
+ * @param <K> Type of the keys to which timers are scoped.
+ * @param <N> Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface Triggerable<K, N> {
+
+	/**
+	 * Invoked when an event-time timer fires.
+	 */
+	void onEventTime(InternalTimer<K, N> timer) throws Exception;
+
+	/**
+	 * Invoked when a processing-time timer fires.
+	 */
+	void onProcessingTime(InternalTimer<K, N> timer) throws Exception ;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index 580a860..ee58a0a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -22,7 +22,6 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -34,12 +33,6 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 
 	private transient TimestampedCollector<OUT> collector;
 
-	// We keep track of watermarks from both inputs, the combined input is the minimum
-	// Once the minimum advances we emit a new watermark for downstream operators
-	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MIN_VALUE;
-	private long input2Watermark = Long.MIN_VALUE;
-
 	public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
 	}
@@ -63,26 +56,6 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 		userFunction.flatMap2(element.getValue(), collector);
 	}
 
-	@Override
-	public void processWatermark1(Watermark mark) throws Exception {
-		input1Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
-	@Override
-	public void processWatermark2(Watermark mark) throws Exception {
-		input2Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
 	protected TimestampedCollector<OUT> getCollector() {
 		return collector;
 	}


Mime
View raw message