flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject flink git commit: [FLINK-8560] Add KeyedProcessFunction exposing key in onTimer().
Date Tue, 06 Mar 2018 16:18:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 38785a007 -> 159986292


[FLINK-8560] Add KeyedProcessFunction exposing key in onTimer().

This closes #5481.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15998629
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15998629
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15998629

Branch: refs/heads/master
Commit: 159986292e35a71737bcc434d5f20f385973fafa
Parents: 38785a0
Author: Bowen Li <bowenli86@gmail.com>
Authored: Thu Feb 15 21:37:44 2018 +0100
Committer: kkloudas <kkloudas@gmail.com>
Committed: Tue Mar 6 16:52:50 2018 +0100

----------------------------------------------------------------------
 docs/dev/stream/operators/process_function.md   |  29 +-
 ...KeyedProcessOperatorWithWatermarkDelay.scala |   6 +-
 .../runtime/harness/NonWindowHarnessTest.scala  |   6 +-
 .../runtime/harness/OverWindowHarnessTest.scala |  16 +-
 .../SortProcessFunctionHarnessTest.scala        |   6 +-
 .../streaming/api/datastream/KeyedStream.java   |  72 ++-
 .../api/functions/KeyedProcessFunction.java     | 130 +++++
 .../api/operators/KeyedProcessOperator.java     |  46 +-
 .../operators/LegacyKeyedProcessOperator.java   | 178 +++++++
 .../flink/streaming/api/DataStreamTest.java     |  43 +-
 .../api/operators/KeyedProcessOperatorTest.java |  82 ++--
 .../LegacyKeyedProcessOperatorTest.java         | 483 +++++++++++++++++++
 .../flink/streaming/api/scala/KeyedStream.scala |  37 +-
 .../streaming/api/scala/DataStreamTest.scala    |  41 +-
 14 files changed, 1078 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/docs/dev/stream/operators/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md
index a52c5bf..d967983 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -242,4 +242,31 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
 the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's
 harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic
 depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
-that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
\ No newline at end of file
+that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
+
+## The KeyedProcessFunction
+
+`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)`
+method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
+    K key = ctx.getCurrentKey();
+    // ...
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
+  var key = ctx.getCurrentKey
+  // ...
+}
+{% endhighlight %}
+</div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
index 74b4773..f63bdb5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
@@ -19,16 +19,16 @@
 package org.apache.flink.table.runtime.operators
 
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 
 /**
-  * A [[KeyedProcessOperator]] that supports holding back watermarks with a static delay.
+  * A [[LegacyKeyedProcessOperator]] that supports holding back watermarks with a static delay.
   */
 class KeyedProcessOperatorWithWatermarkDelay[KEY, IN, OUT](
     private val function: ProcessFunction[IN, OUT],
     private var watermarkDelay: Long = 0L)
-  extends KeyedProcessOperator[KEY, IN, OUT](function) {
+  extends LegacyKeyedProcessOperator[KEY, IN, OUT](function) {
 
   /** emits watermark without delay */
   def emitWithoutDelay(mark: Watermark): Unit = output.emitWatermark(mark)

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index ad50761..5c31cb2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.runtime.aggregate._
@@ -39,7 +39,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindow(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new GroupAggProcessFunction(
         genSumAggFunction,
         sumAggregationStateType,
@@ -99,7 +99,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowWithRetract(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new GroupAggProcessFunction(
         genSumAggFunction,
         sumAggregationStateType,

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index def1972..6f6fc0e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.runtime.aggregate._
@@ -40,7 +40,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeBoundedRowsOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new ProcTimeBoundedRowsOver(
         genMinMaxAggFunction,
         2,
@@ -141,7 +141,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeBoundedRangeOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new ProcTimeBoundedRangeOver(
         genMinMaxAggFunction,
         4000,
@@ -250,7 +250,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeUnboundedOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new ProcTimeUnboundedOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
@@ -342,7 +342,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeBoundedRangeOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new RowTimeBoundedRangeOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
@@ -492,7 +492,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeBoundedRowsOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new RowTimeBoundedRowsOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
@@ -640,7 +640,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeUnboundedRangeOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new RowTimeUnboundedRangeOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
@@ -776,7 +776,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeUnboundedRowsOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new RowTimeUnboundedRowsOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
index 9490039..457bde2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.typeutils.runtime.RowComparator
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
@@ -71,7 +71,7 @@ class SortProcessFunctionHarnessTest {
     
     val inputCRowType = CRowTypeInfo(rT)
     
-    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+    val processFunction = new LegacyKeyedProcessOperator[Integer,CRow,CRow](
       new ProcTimeSortProcessFunction(
         inputCRowType,
         collectionRowComparator))
@@ -170,7 +170,7 @@ class SortProcessFunctionHarnessTest {
 
     val inputCRowType = CRowTypeInfo(rT)
 
-    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+    val processFunction = new LegacyKeyedProcessOperator[Integer,CRow,CRow](
       new RowTimeSortProcessFunction(
         inputCRowType,
         4,

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 7beaa03..a948ae2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperato
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
@@ -272,8 +274,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies the given {@link ProcessFunction} on the input stream, thereby
-	 * creating a transformed output stream.
+	 * Applies the given {@link ProcessFunction} on the input stream, thereby creating a transformed output stream.
 	 *
 	 * <p>The function will be called for every element in the input streams and can produce zero
 	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
@@ -286,7 +287,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
+	 *
+	 * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction)}
 	 */
+	@Deprecated
 	@Override
 	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
@@ -306,8 +310,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies the given {@link ProcessFunction} on the input stream, thereby
-	 * creating a transformed output stream.
+	 * Applies the given {@link ProcessFunction} on the input stream, thereby creating a transformed output stream.
 	 *
 	 * <p>The function will be called for every element in the input streams and can produce zero
 	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
@@ -321,19 +324,76 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
+	 *
+	 * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)}
 	 */
+	@Deprecated
 	@Override
 	@Internal
 	public <R> SingleOutputStreamOperator<R> process(
 			ProcessFunction<T, R> processFunction,
 			TypeInformation<R> outputType) {
 
-		KeyedProcessOperator<KEY, T, R> operator =
-				new KeyedProcessOperator<>(clean(processFunction));
+		LegacyKeyedProcessOperator<KEY, T, R> operator = new LegacyKeyedProcessOperator<>(clean(processFunction));
 
 		return transform("Process", outputType, operator);
 	}
 
+	/**
+	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+	 * function, this function can also query the time and set timers. When reacting to the firing
+	 * of set timers the function can directly emit elements and/or register yet more timers.
+	 *
+	 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
+	 *
+	 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
+
+		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
+				keyedProcessFunction,
+				KeyedProcessFunction.class,
+				1,
+				2,
+				TypeExtractor.NO_INDEX,
+				TypeExtractor.NO_INDEX,
+				getType(),
+				Utils.getCallLocationName(),
+				true);
+
+		return process(keyedProcessFunction, outType);
+	}
+
+	/**
+	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+	 * function, this function can also query the time and set timers. When reacting to the firing
+	 * of set timers the function can directly emit elements and/or register yet more timers.
+	 *
+	 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream.
+	 *
+	 * @param outputType {@link TypeInformation} for the result type of the function.
+	 *
+	 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> process(
+			KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
+			TypeInformation<R> outputType) {
+
+		KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
+		return transform("KeyedProcess", outputType, operator);
+	}
 
 	// ------------------------------------------------------------------------
 	//  Windowing

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
new file mode 100644
index 0000000..a03480b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A keyed function that processes elements of a stream.
+ *
+ * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. Implementations can also
+ * query the time and set timers through the provided {@link Context}. For firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
+ * zero or more elements as output and register further timers.
+ *
+ * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
+ * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.
+ *
+ * <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and
+ * teardown methods can be implemented. See
+ * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <K> Type of the key.
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the output elements.
+ */
+@PublicEvolving
+public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Process one element from the input stream.
+	 *
+	 * <p>This function can output zero or more elements using the {@link Collector} parameter
+	 * and also update internal state or set timers using the {@link Context} parameter.
+	 *
+	 * @param value The input value.
+	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
+	 *            a {@link TimerService} for registering timers and querying the time. The
+	 *            context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
+
+	/**
+	 * Called when a timer set using {@link TimerService} fires.
+	 *
+	 * @param timestamp The timestamp of the firing timer.
+	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link TimeDomain}, and the key
+	 *            of the firing timer and getting a {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
+
+	/**
+	 * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
+	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	public abstract class Context {
+
+		/**
+		 * Timestamp of the element currently being processed or timestamp of a firing timer.
+		 *
+		 * <p>This might be {@code null}, for example if the time characteristic of your program
+		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+		 */
+		public abstract Long timestamp();
+
+		/**
+		 * A {@link TimerService} for querying time and registering timers.
+		 */
+		public abstract TimerService timerService();
+
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 *
+		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+		 * @param value The record to emit.
+		 */
+		public abstract <X> void output(OutputTag<X> outputTag, X value);
+	}
+
+	/**
+	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	public abstract class OnTimerContext extends Context {
+		/**
+		 * The {@link TimeDomain} of the firing timer.
+		 */
+		public abstract TimeDomain timeDomain();
+
+		/**
+		 * Get key of the firing timer.
+		 */
+		public abstract K getCurrentKey();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index 6501a9d..b74fdf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.SimpleTimerService;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
 
@@ -31,12 +31,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing keyed
- * {@link ProcessFunction ProcessFunctions}.
+ * A {@link StreamOperator} for executing {@link KeyedProcessFunction KeyedProcessFunctions}.
  */
 @Internal
 public class KeyedProcessOperator<K, IN, OUT>
-		extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+		extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
 
 	private static final long serialVersionUID = 1L;
@@ -47,7 +46,7 @@ public class KeyedProcessOperator<K, IN, OUT>
 
 	private transient OnTimerContextImpl onTimerContext;
 
-	public KeyedProcessOperator(ProcessFunction<IN, OUT> function) {
+	public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) {
 		super(function);
 
 		chainingStrategy = ChainingStrategy.ALWAYS;
@@ -70,21 +69,13 @@ public class KeyedProcessOperator<K, IN, OUT>
 	@Override
 	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
 		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
+		invokeUserFunction(TimeDomain.EVENT_TIME, timer);
 	}
 
 	@Override
 	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
 		collector.eraseTimestamp();
-		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
+		invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
 	}
 
 	@Override
@@ -95,13 +86,23 @@ public class KeyedProcessOperator<K, IN, OUT>
 		context.element = null;
 	}
 
-	private class ContextImpl extends ProcessFunction<IN, OUT>.Context {
+	private void invokeUserFunction(
+			TimeDomain timeDomain,
+			InternalTimer<K, VoidNamespace> timer) throws Exception {
+		onTimerContext.timeDomain = timeDomain;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
 
 		private final TimerService timerService;
 
 		private StreamRecord<IN> element;
 
-		ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+		ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
 			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
@@ -132,15 +133,15 @@ public class KeyedProcessOperator<K, IN, OUT>
 		}
 	}
 
-	private class OnTimerContextImpl extends ProcessFunction<IN, OUT>.OnTimerContext{
+	private class OnTimerContextImpl extends KeyedProcessFunction<K, IN, OUT>.OnTimerContext {
 
 		private final TimerService timerService;
 
 		private TimeDomain timeDomain;
 
-		private InternalTimer<?, VoidNamespace> timer;
+		private InternalTimer<K, VoidNamespace> timer;
 
-		OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+		OnTimerContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
 			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
@@ -170,5 +171,10 @@ public class KeyedProcessOperator<K, IN, OUT>
 			checkState(timeDomain != null);
 			return timeDomain;
 		}
+
+		@Override
+		public K getCurrentKey() {
+			return timer.getKey();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperator.java
new file mode 100644
index 0000000..8481c46
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StreamOperator} for executing keyed {@link ProcessFunction ProcessFunctions}.
+ *
+ * @deprecated Replaced by {@link KeyedProcessOperator} which takes {@code KeyedProcessFunction}
+ */
+@Deprecated
+@Internal
+public class LegacyKeyedProcessOperator<K, IN, OUT>
+		extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient ContextImpl context;
+
+	private transient OnTimerContextImpl onTimerContext;
+
+	public LegacyKeyedProcessOperator(ProcessFunction<IN, OUT> function) {
+		super(function);
+
+		chainingStrategy = ChainingStrategy.ALWAYS;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+		TimerService timerService = new SimpleTimerService(internalTimerService);
+
+		context = new ContextImpl(userFunction, timerService);
+		onTimerContext = new OnTimerContextImpl(userFunction, timerService);
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		invokeUserFunction(TimeDomain.EVENT_TIME, timer);
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.eraseTimestamp();
+		invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	private void invokeUserFunction(
+			TimeDomain timeDomain,
+			InternalTimer<K, VoidNamespace> timer) throws Exception {
+		onTimerContext.timeDomain = timeDomain;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	private class ContextImpl extends ProcessFunction<IN, OUT>.Context {
+
+		private final TimerService timerService;
+
+		private StreamRecord<IN> element;
+
+		ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+			function.super();
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(element != null);
+
+			if (element.hasTimestamp()) {
+				return element.getTimestamp();
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+
+		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
+
+			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+		}
+	}
+
+	private class OnTimerContextImpl extends ProcessFunction<IN, OUT>.OnTimerContext{
+
+		private final TimerService timerService;
+
+		private TimeDomain timeDomain;
+
+		private InternalTimer<?, VoidNamespace> timer;
+
+		OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+			function.super();
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(timer != null);
+			return timer.getTimestamp();
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+
+		@Override
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
+
+			output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
+		}
+
+		@Override
+		public TimeDomain timeDomain() {
+			checkState(timeDomain != null);
+			return timeDomain;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index ec8a134..4fa3fc8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
@@ -61,6 +62,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -689,11 +691,11 @@ public class DataStreamTest extends TestLogger {
 	}
 
 	/**
-	 * Verify that a {@link KeyedStream#process(ProcessFunction)} call is correctly translated to
-	 * an operator.
+	 * Verify that a {@link KeyedStream#process(ProcessFunction)} call is correctly translated to an operator.
 	 */
 	@Test
-	public void testKeyedProcessTranslation() {
+	@Deprecated
+	public void testKeyedStreamProcessTranslation() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		DataStreamSource<Long> src = env.generateSequence(0, 0);
 
@@ -724,12 +726,43 @@ public class DataStreamTest extends TestLogger {
 		processed.addSink(new DiscardingSink<Integer>());
 
 		assertEquals(processFunction, getFunctionForDataStream(processed));
+		assertTrue(getOperatorForDataStream(processed) instanceof LegacyKeyedProcessOperator);
+	}
+
+	/**
+	 * Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call is correctly translated to an operator.
+	 */
+	@Test
+	public void testKeyedStreamKeyedProcessTranslation() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStreamSource<Long> src = env.generateSequence(0, 0);
+
+		KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction = new KeyedProcessFunction<Long, Long, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void processElement(Long value, Context ctx, Collector<Integer> out) throws Exception {
+				// Do nothing
+			}
+
+			@Override
+			public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
+				// Do nothing
+			}
+		};
+
+		DataStream<Integer> processed = src
+				.keyBy(new IdentityKeySelector<Long>())
+				.process(keyedProcessFunction);
+
+		processed.addSink(new DiscardingSink<Integer>());
+
+		assertEquals(keyedProcessFunction, getFunctionForDataStream(processed));
 		assertTrue(getOperatorForDataStream(processed) instanceof KeyedProcessOperator);
 	}
 
 	/**
-	 * Verify that a {@link DataStream#process(ProcessFunction)} call is correctly translated to
-	 * an operator.
+	 * Verify that a {@link DataStream#process(ProcessFunction)} call is correctly translated to an operator.
 	 */
 	@Test
 	public void testProcessTranslation() {

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
index e1986f3..c5f478c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
@@ -111,8 +111,10 @@ public class KeyedProcessOperatorTest extends TestLogger {
 	@Test
 	public void testEventTimeTimers() throws Exception {
 
+		final int expectedKey = 17;
+
 		KeyedProcessOperator<Integer, Integer, Integer> operator =
-				new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+				new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME, expectedKey));
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -122,14 +124,14 @@ public class KeyedProcessOperatorTest extends TestLogger {
 
 		testHarness.processWatermark(new Watermark(0));
 
-		testHarness.processElement(new StreamRecord<>(17, 42L));
+		testHarness.processElement(new StreamRecord<>(expectedKey, 42L));
 
 		testHarness.processWatermark(new Watermark(5));
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		expectedOutput.add(new Watermark(0L));
-		expectedOutput.add(new StreamRecord<>(17, 42L));
+		expectedOutput.add(new StreamRecord<>(expectedKey, 42L));
 		expectedOutput.add(new StreamRecord<>(1777, 5L));
 		expectedOutput.add(new Watermark(5L));
 
@@ -141,8 +143,10 @@ public class KeyedProcessOperatorTest extends TestLogger {
 	@Test
 	public void testProcessingTimeTimers() throws Exception {
 
+		final int expectedKey = 17;
+
 		KeyedProcessOperator<Integer, Integer, Integer> operator =
-				new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+				new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME, expectedKey));
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -150,13 +154,13 @@ public class KeyedProcessOperatorTest extends TestLogger {
 		testHarness.setup();
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<>(17));
+		testHarness.processElement(new StreamRecord<>(expectedKey));
 
 		testHarness.setProcessingTime(5);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
-		expectedOutput.add(new StreamRecord<>(17));
+		expectedOutput.add(new StreamRecord<>(expectedKey));
 		expectedOutput.add(new StreamRecord<>(1777));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -243,8 +247,10 @@ public class KeyedProcessOperatorTest extends TestLogger {
 	@Test
 	public void testSnapshotAndRestore() throws Exception {
 
+		final int expectedKey = 5;
+
 		KeyedProcessOperator<Integer, Integer, String> operator =
-				new KeyedProcessOperator<>(new BothTriggeringFlatMapFunction());
+				new KeyedProcessOperator<>(new BothTriggeringFlatMapFunction(expectedKey));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -252,14 +258,14 @@ public class KeyedProcessOperatorTest extends TestLogger {
 		testHarness.setup();
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<>(5, 12L));
+		testHarness.processElement(new StreamRecord<>(expectedKey, 12L));
 
 		// snapshot and restore from scratch
 		OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
 		testHarness.close();
 
-		operator = new KeyedProcessOperator<>(new BothTriggeringFlatMapFunction());
+		operator = new KeyedProcessOperator<>(new BothTriggeringFlatMapFunction(expectedKey));
 
 		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
 
@@ -283,8 +289,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
 
 	@Test
 	public void testNullOutputTagRefusal() throws Exception {
-		KeyedProcessOperator<Integer, Integer, String> operator =
-			new KeyedProcessOperator<>(new NullOutputTagEmittingProcessFunction());
+		KeyedProcessOperator<Integer, Integer, String> operator = new KeyedProcessOperator<>(new NullOutputTagEmittingProcessFunction());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(
@@ -307,8 +312,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
 	 */
 	@Test
 	public void testSideOutput() throws Exception {
-		KeyedProcessOperator<Integer, Integer, String> operator =
-			new KeyedProcessOperator<>(new SideOutputProcessFunction());
+		KeyedProcessOperator<Integer, Integer, String> operator = new KeyedProcessOperator<>(new SideOutputProcessFunction());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(
@@ -346,7 +350,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
-	private static class NullOutputTagEmittingProcessFunction extends ProcessFunction<Integer, String> {
+	private static class NullOutputTagEmittingProcessFunction extends KeyedProcessFunction<Integer, Integer, String> {
 
 		@Override
 		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
@@ -354,7 +358,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class SideOutputProcessFunction extends ProcessFunction<Integer, String> {
+	private static class SideOutputProcessFunction extends KeyedProcessFunction<Integer, Integer, String> {
 
 		static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new OutputTag<Integer>("int-out") {};
 		static final OutputTag<Long> LONG_OUTPUT_TAG = new OutputTag<Long>("long-out") {};
@@ -377,19 +381,19 @@ public class KeyedProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
+	private static class QueryingFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 
-		private final TimeDomain timeDomain;
+		private final TimeDomain expectedTimeDomain;
 
 		public QueryingFlatMapFunction(TimeDomain timeDomain) {
-			this.timeDomain = timeDomain;
+			this.expectedTimeDomain = timeDomain;
 		}
 
 		@Override
 		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
-			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+			if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
 				out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
 			} else {
 				out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
@@ -401,23 +405,26 @@ public class KeyedProcessOperatorTest extends TestLogger {
 				long timestamp,
 				OnTimerContext ctx,
 				Collector<String> out) throws Exception {
+			// Do nothing
 		}
 	}
 
-	private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
+	private static class TriggeringFlatMapFunction extends KeyedProcessFunction<Integer, Integer, Integer> {
 
 		private static final long serialVersionUID = 1L;
 
-		private final TimeDomain timeDomain;
+		private final TimeDomain expectedTimeDomain;
+		private final Integer expectedKey;
 
-		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
-			this.timeDomain = timeDomain;
+		public TriggeringFlatMapFunction(TimeDomain timeDomain, Integer expectedKey) {
+			this.expectedTimeDomain = timeDomain;
+			this.expectedKey = expectedKey;
 		}
 
 		@Override
 		public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
 			out.collect(value);
-			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+			if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
 				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
 			} else {
 				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
@@ -429,30 +436,30 @@ public class KeyedProcessOperatorTest extends TestLogger {
 				long timestamp,
 				OnTimerContext ctx,
 				Collector<Integer> out) throws Exception {
-
-			assertEquals(this.timeDomain, ctx.timeDomain());
+			assertEquals(expectedKey, ctx.getCurrentKey());
+			assertEquals(expectedTimeDomain, ctx.timeDomain());
 			out.collect(1777);
 		}
 	}
 
-	private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
+	private static class TriggeringStatefulFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 
 		private final ValueStateDescriptor<Integer> state =
 				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
 
-		private final TimeDomain timeDomain;
+		private final TimeDomain expectedTimeDomain;
 
 		public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
-			this.timeDomain = timeDomain;
+			this.expectedTimeDomain = timeDomain;
 		}
 
 		@Override
 		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			out.collect("INPUT:" + value);
 			getRuntimeContext().getState(state).update(value);
-			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+			if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
 				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
 			} else {
 				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
@@ -464,15 +471,21 @@ public class KeyedProcessOperatorTest extends TestLogger {
 				long timestamp,
 				OnTimerContext ctx,
 				Collector<String> out) throws Exception {
-			assertEquals(this.timeDomain, ctx.timeDomain());
+			assertEquals(expectedTimeDomain, ctx.timeDomain());
 			out.collect("STATE:" + getRuntimeContext().getState(state).value());
 		}
 	}
 
-	private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
+	private static class BothTriggeringFlatMapFunction extends KeyedProcessFunction<Integer, Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 
+		private final Integer expectedKey;
+
+		public BothTriggeringFlatMapFunction(Integer expectedKey) {
+			this.expectedKey = expectedKey;
+		}
+
 		@Override
 		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			ctx.timerService().registerProcessingTimeTimer(5);
@@ -484,6 +497,8 @@ public class KeyedProcessOperatorTest extends TestLogger {
 				long timestamp,
 				OnTimerContext ctx,
 				Collector<String> out) throws Exception {
+			assertEquals(expectedKey, ctx.getCurrentKey());
+
 			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
 				out.collect("EVENT:1777");
 			} else {
@@ -491,5 +506,4 @@ public class KeyedProcessOperatorTest extends TestLogger {
 			}
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperatorTest.java
new file mode 100644
index 0000000..970bb35
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperatorTest.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link LegacyKeyedProcessOperator}.
+ */
+@Deprecated
+public class LegacyKeyedProcessOperatorTest extends TestLogger {
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	@Test
+	public void testTimestampAndWatermarkQuerying() throws Exception {
+
+		LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+				new LegacyKeyedProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(17));
+		testHarness.processElement(new StreamRecord<>(5, 12L));
+
+		testHarness.processWatermark(new Watermark(42));
+		testHarness.processElement(new StreamRecord<>(6, 13L));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(17L));
+		expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
+		expectedOutput.add(new Watermark(42L));
+		expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+		LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+				new LegacyKeyedProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		testHarness.processElement(new StreamRecord<>(5));
+
+		testHarness.setProcessingTime(42);
+		testHarness.processElement(new StreamRecord<>(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
+		expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testEventTimeTimers() throws Exception {
+
+		LegacyKeyedProcessOperator<Integer, Integer, Integer> operator =
+				new LegacyKeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(0));
+
+		testHarness.processElement(new StreamRecord<>(17, 42L));
+
+		testHarness.processWatermark(new Watermark(5));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(0L));
+		expectedOutput.add(new StreamRecord<>(17, 42L));
+		expectedOutput.add(new StreamRecord<>(1777, 5L));
+		expectedOutput.add(new Watermark(5L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTimers() throws Exception {
+
+		LegacyKeyedProcessOperator<Integer, Integer, Integer> operator =
+				new LegacyKeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(17));
+
+		testHarness.setProcessingTime(5);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>(17));
+		expectedOutput.add(new StreamRecord<>(1777));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testEventTimeTimerWithState() throws Exception {
+
+		LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+				new LegacyKeyedProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(1));
+		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+		testHarness.processWatermark(new Watermark(2));
+		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
+
+		testHarness.processWatermark(new Watermark(6));
+		testHarness.processWatermark(new Watermark(7));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(1L));
+		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+		expectedOutput.add(new Watermark(2L));
+		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new Watermark(6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new Watermark(7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testProcessingTimeTimerWithState() throws Exception {
+
+		LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+				new LegacyKeyedProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(1);
+		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
+
+		testHarness.setProcessingTime(2);
+		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
+
+		testHarness.setProcessingTime(6);
+		testHarness.setProcessingTime(7);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT:42"));
+		expectedOutput.add(new StreamRecord<>("STATE:17"));
+		expectedOutput.add(new StreamRecord<>("STATE:42"));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+
+		LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+				new LegacyKeyedProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(5, 12L));
+
+		// snapshot and restore from scratch
+		OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
+
+		testHarness.close();
+
+		operator = new LegacyKeyedProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.setProcessingTime(5);
+		testHarness.processWatermark(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("PROC:1777"));
+		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+		expectedOutput.add(new Watermark(6));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testNullOutputTagRefusal() throws Exception {
+		LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+			new LegacyKeyedProcessOperator<>(new NullOutputTagEmittingProcessFunction());
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				operator, new IdentityKeySelector<>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		try {
+			expectedException.expect(IllegalArgumentException.class);
+			testHarness.processElement(new StreamRecord<>(5));
+		} finally {
+			testHarness.close();
+		}
+	}
+
+	/**
+	 * This also verifies that the timestamps ouf side-emitted records is correct.
+	 */
+	@Test
+	public void testSideOutput() throws Exception {
+		LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+			new LegacyKeyedProcessOperator<>(new SideOutputProcessFunction());
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				operator, new IdentityKeySelector<>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(42, 17L /* timestamp */));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("IN:42", 17L /* timestamp */));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		ConcurrentLinkedQueue<StreamRecord<Integer>> expectedIntSideOutput = new ConcurrentLinkedQueue<>();
+		expectedIntSideOutput.add(new StreamRecord<>(42, 17L /* timestamp */));
+		ConcurrentLinkedQueue<StreamRecord<Integer>> intSideOutput =
+			testHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG);
+		TestHarnessUtil.assertOutputEquals(
+			"Side output was not correct.",
+			expectedIntSideOutput,
+			intSideOutput);
+
+		ConcurrentLinkedQueue<StreamRecord<Long>> expectedLongSideOutput = new ConcurrentLinkedQueue<>();
+		expectedLongSideOutput.add(new StreamRecord<>(42L, 17L /* timestamp */));
+		ConcurrentLinkedQueue<StreamRecord<Long>> longSideOutput =
+			testHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG);
+		TestHarnessUtil.assertOutputEquals(
+			"Side output was not correct.",
+			expectedLongSideOutput,
+			longSideOutput);
+
+		testHarness.close();
+	}
+
+	private static class NullOutputTagEmittingProcessFunction extends ProcessFunction<Integer, String> {
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			ctx.output(null, value);
+		}
+	}
+
+	private static class SideOutputProcessFunction extends ProcessFunction<Integer, String> {
+
+		static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new OutputTag<Integer>("int-out") {};
+		static final OutputTag<Long> LONG_OUTPUT_TAG = new OutputTag<Long>("long-out") {};
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("IN:" + value);
+
+			ctx.output(INTEGER_OUTPUT_TAG, value);
+			ctx.output(LONG_OUTPUT_TAG, value.longValue());
+		}
+	}
+
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final TimeDomain timeDomain;
+
+		public QueryingFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+			} else {
+				out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+			}
+		}
+
+		@Override
+		public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
+			// Do nothing
+		}
+	}
+
+	private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final TimeDomain timeDomain;
+
+		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+			out.collect(value);
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+			} else {
+				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+			}
+		}
+
+		@Override
+		public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
+			assertEquals(this.timeDomain, ctx.timeDomain());
+			out.collect(1777);
+		}
+	}
+
+	private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Integer> state =
+				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
+
+		private final TimeDomain timeDomain;
+
+		public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT:" + value);
+			getRuntimeContext().getState(state).update(value);
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+			} else {
+				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+			}
+		}
+
+		@Override
+		public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
+			assertEquals(this.timeDomain, ctx.timeDomain());
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			ctx.timerService().registerProcessingTimeTimer(5);
+			ctx.timerService().registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
+			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+				out.collect("EVENT:1777");
+			} else {
+				out.collect("PROC:1777");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 49bdbd9..51def98 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateD
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.streaming.api.datastream.{QueryableStateStream, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -66,9 +66,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     * function, this function can also query the time and set timers. When reacting to the firing
     * of set timers the function can directly emit elements and/or register yet more timers.
     *
-    * @param processFunction The [[ProcessFunction]] that is called for each element
-    *                   in the stream.
+    * @param processFunction The [[ProcessFunction]] that is called for each element in the stream.
+    *
+    * @deprecated Use [[KeyedStream#process(KeyedProcessFunction)]]
     */
+  @deprecated("will be removed in a future version")
   @PublicEvolving
   override def process[R: TypeInformation](
     processFunction: ProcessFunction[T, R]): DataStream[R] = {
@@ -79,7 +81,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
 
     asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
   }
-  
+
+  /**
+   * Applies the given [[KeyedProcessFunction]] on the input stream, thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can produce
+   * zero or more output. The function can also query the time and set timers. When
+   * reacting to the firing of set timers the function can emit yet more elements.
+   *
+   * The function will be called for every element in the input streams and can produce zero
+   * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]]
+   * function, this function can also query the time and set timers. When reacting to the firing
+   * of set timers the function can directly emit elements and/or register yet more timers.
+   *
+   * @param keyedProcessFunction The [[KeyedProcessFunction]] that is called for each element
+   *                             in the stream.
+   */
+  @PublicEvolving
+  def process[R: TypeInformation](
+    keyedProcessFunction: KeyedProcessFunction[K, T, R]): DataStream[R] = {
+
+    if (keyedProcessFunction == null) {
+      throw new NullPointerException("KeyedProcessFunction must not be null.")
+    }
+
+    asScalaStream(javaStream.process(keyedProcessFunction, implicitly[TypeInformation[R]]))
+  }
+
   // ------------------------------------------------------------------------
   //  Windowing
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index e2c5b41..51ec5e3 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -23,13 +23,11 @@ import java.lang
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, KeyedProcessOperator, ProcessOperator, StreamOperator}
+import org.apache.flink.streaming.api.operators._
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
-import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.runtime.partitioner._
@@ -430,10 +428,11 @@ class DataStreamTest extends AbstractTestBase {
   }
 
   /**
-   * Verify that a [[KeyedStream.process()]] call is correctly translated to an operator.
+   * Verify that a [[KeyedStream.process(ProcessFunction)]] call is correctly
+   * translated to an operator.
    */
   @Test
-  def testKeyedProcessTranslation(): Unit = {
+  def testKeyedStreamProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val src = env.generateSequence(0, 0)
@@ -448,12 +447,36 @@ class DataStreamTest extends AbstractTestBase {
     val flatMapped = src.keyBy(x => x).process(processFunction)
 
     assert(processFunction == getFunctionForDataStream(flatMapped))
+    assert(getOperatorForDataStream(flatMapped).isInstanceOf[LegacyKeyedProcessOperator[_, _, _]])
+  }
+
+  /**
+   * Verify that a [[KeyedStream.process(KeyedProcessFunction)]] call is correctly
+   * translated to an operator.
+   */
+  @Test
+  def testKeyedStreamKeyedProcessTranslation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val src = env.generateSequence(0, 0)
+
+    val keyedProcessFunction = new KeyedProcessFunction[Long, Long, Int] {
+      override def processElement(
+                                   value: Long,
+                                   ctx: KeyedProcessFunction[Long, Long, Int]#Context,
+                                   out: Collector[Int]): Unit = ???
+    }
+
+    val flatMapped = src.keyBy(x => x).process(keyedProcessFunction)
+
+    assert(keyedProcessFunction == getFunctionForDataStream(flatMapped))
     assert(getOperatorForDataStream(flatMapped).isInstanceOf[KeyedProcessOperator[_, _, _]])
   }
 
   /**
-    * Verify that a [[DataStream.process()]] call is correctly translated to an operator.
-    */
+   * Verify that a [[DataStream.process(ProcessFunction)]] call is correctly
+   * translated to an operator.
+   */
   @Test
   def testProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -473,7 +496,6 @@ class DataStreamTest extends AbstractTestBase {
     assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]])
   }
 
-
   @Test def operatorTest() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
@@ -688,5 +710,4 @@ class DataStreamTest extends AbstractTestBase {
     m.print()
     m.getId
   }
-
 }


Mime
View raw message