flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [04/10] flink git commit: [FLINK-3578] [streaming scala] Support RichFunctions for Scala WindowFunction and AllWindowFunction
Date Fri, 04 Mar 2016 23:25:33 GMT
[FLINK-3578] [streaming scala] Support RichFunctions for Scala WindowFunction and AllWindowFunction

This also consolidates the various wrapper classes for fold() and reduce() functions.

This closes #1765


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7bdac1af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7bdac1af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7bdac1af

Branch: refs/heads/master
Commit: 7bdac1afa0e05d0c39b2521c3472c6b3eebb4f45
Parents: bb62ab0
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Mar 4 12:23:45 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Mar 4 20:58:32 2016 +0100

----------------------------------------------------------------------
 .../functions/windowing/RichWindowFunction.java |  17 +-
 .../InternalIterableWindowFunction.java         |   2 +-
 .../streaming/api/scala/AllWindowedStream.scala | 111 +++-----
 .../streaming/api/scala/WindowedStream.scala    |  95 +++----
 .../api/scala/function/AllWindowFunction.scala  |  11 +-
 .../scala/function/RichAllWindowFunction.scala  |  37 +++
 .../api/scala/function/RichWindowFunction.scala |  38 +++
 .../function/util/ScalaAllWindowFunction.scala  |  38 +++
 .../util/ScalaAllWindowFunctionWrapper.scala    |  68 +++++
 .../scala/function/util/ScalaFoldFunction.scala |  33 +++
 .../function/util/ScalaReduceFunction.scala     |  33 +++
 .../function/util/ScalaWindowFunction.scala     |  38 +++
 .../util/ScalaWindowFunctionWrapper.scala       |  68 +++++
 .../streaming/api/scala/WindowFoldITCase.scala  | 123 ++++++++-
 .../api/scala/WindowFunctionITCase.scala        | 157 +++++++++++
 .../api/scala/WindowReduceITCase.scala          | 276 +++++++++++++++++++
 .../CheckingIdentityRichAllWindowFunction.scala |  84 ++++++
 .../CheckingIdentityRichWindowFunction.scala    |  81 ++++++
 18 files changed, 1168 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
index 18ad19a..2ed271f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,12 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+/**
+ * Rich variant of the {@link WindowFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and tear-down methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
 @Public
 public abstract class RichWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, KEY, W> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 32318ea..7b441fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
 public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> implements RichFunction {
 	private static final long serialVersionUID = 1L;
 
-	protected WindowFunction<IN, OUT, KEY, W> wrappedFunction;
+	protected final WindowFunction<IN, OUT, KEY, W> wrappedFunction;
 
 	public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) {
 		this.wrappedFunction = wrappedFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index dcb9822..020c619 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -24,15 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction}
 import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaReduceFunction, ScalaFoldFunction}
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-import scala.collection.JavaConverters._
-
 /**
  * A [[AllWindowedStream]] represents a data stream where the stream of
  * elements is split into windows based on a
@@ -56,7 +54,7 @@ import scala.collection.JavaConverters._
  */
 @Public
 class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
-
+  
   /**
    * Sets the [[Trigger]] that should be used to trigger window emission.
    */
@@ -121,9 +119,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
       throw new NullPointerException("Reduce function must not be null.")
     }
     val cleanFun = clean(function)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
+    val reducer = new ScalaReduceFunction[T](cleanFun)
+    
     reduce(reducer)
   }
 
@@ -160,11 +157,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
       throw new NullPointerException("Fold function must not be null.")
     }
     val cleanFun = clean(function)
-    val folder = new FoldFunction[T,R] {
-      def fold(acc: R, v: T) = {
-        cleanFun(acc, v)
-      }
-    }
+    val folder = new ScalaFoldFunction[T,R](cleanFun)
+    
     fold(initialValue, folder)
   }
 
@@ -183,11 +177,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
       function: AllWindowFunction[T, R, W]): DataStream[R] = {
     
     val cleanedFunction = clean(function)
-    val javaFunction = new JAllWindowFunction[T, R, W] {
-      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanedFunction(window, elements.asScala, out)
-      }
-    }
+    val javaFunction = new ScalaAllWindowFunctionWrapper[T, R, W](cleanedFunction)
+    
     asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]]))
   }
 
@@ -206,11 +197,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
       function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     
     val cleanedFunction = clean(function)
-    val applyFunction = new JAllWindowFunction[T, R, W] {
-      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanedFunction(window, elements.asScala, out)
-      }
-    }
+    val applyFunction = new ScalaAllWindowFunction[T, R, W](cleanedFunction)
+    
     asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
   }
 
@@ -222,22 +210,20 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
    *
    * @param preAggregator The reduce function that is used for pre-aggregation
-   * @param function The window function.
+   * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
   def apply[R: TypeInformation](
       preAggregator: ReduceFunction[T],
-      function: AllWindowFunction[T, R, W]): DataStream[R] = {
+      windowFunction: AllWindowFunction[T, R, W]): DataStream[R] = {
 
-    val cleanedFunction = clean(function)
-    val applyFunction = new JAllWindowFunction[T, R, W] {
-      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanedFunction(window, elements.asScala, out)
-      }
-    }
+    val cleanedReducer = clean(preAggregator)
+    val cleanedWindowFunction = clean(windowFunction)
+    
+    val applyFunction = new ScalaAllWindowFunctionWrapper[T, R, W](cleanedWindowFunction)
 
     val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, returnType))
+    asScalaStream(javaStream.apply(cleanedReducer, applyFunction, returnType))
   }
 
   /**
@@ -248,30 +234,25 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
    *
    * @param preAggregator The reduce function that is used for pre-aggregation
-   * @param function The window function.
+   * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
   def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
-      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
-    if (function == null) {
+      windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    
+    if (preAggregator == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
-    if (function == null) {
+    if (windowFunction == null) {
       throw new NullPointerException("WindowApply function must not be null.")
     }
 
     val cleanReducer = clean(preAggregator)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
-    }
-
-    val cleanApply = clean(function)
-    val applyFunction = new JAllWindowFunction[T, R, W] {
-      def apply(window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanApply(window, input.asScala, out)
-      }
-    }
+    val cleanWindowFunction = clean(windowFunction)
+    
+    val reducer = new ScalaReduceFunction[T](cleanReducer)
+    val applyFunction = new ScalaAllWindowFunction[T, R, W](cleanWindowFunction)
     
     val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
     asScalaStream(javaStream.apply(reducer, applyFunction, returnType))
@@ -286,24 +267,22 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     *
     * @param initialValue Initial value of the fold
     * @param preAggregator The reduce function that is used for pre-aggregation
-    * @param function The window function.
+    * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
   def apply[R: TypeInformation](
       initialValue: R,
       preAggregator: FoldFunction[T, R],
-      function: AllWindowFunction[R, R, W]): DataStream[R] = {
+      windowFunction: AllWindowFunction[R, R, W]): DataStream[R] = {
 
-    val cleanedFunction = clean(function)
-    val applyFunction = new JAllWindowFunction[R, R, W] {
-      def apply(window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = {
-        cleanedFunction(window, elements.asScala, out)
-      }
-    }
+    val cleanFolder = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+    
+    val applyFunction = new ScalaAllWindowFunctionWrapper[R, R, W](cleanWindowFunction)
     
     asScalaStream(javaStream.apply(
       initialValue,
-      clean(preAggregator),
+      cleanFolder,
       applyFunction,
       implicitly[TypeInformation[R]]))
   }
@@ -317,31 +296,27 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     *
     * @param initialValue Initial value of the fold
     * @param preAggregator The reduce function that is used for pre-aggregation
-    * @param function The window function.
+    * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
   def apply[R: TypeInformation](
       initialValue: R,
       preAggregator: (R, T) => R,
-      function: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
-    if (function == null) {
+      windowFunction: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
+    
+    if (preAggregator == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
-    if (function == null) {
+    if (windowFunction == null) {
       throw new NullPointerException("WindowApply function must not be null.")
     }
 
     val cleanFolder = clean(preAggregator)
-    val folder = new FoldFunction[T, R] {
-      def fold(v1: R, v2: T) = { cleanFolder(v1, v2) }
-    }
-
-    val cleanApply = clean(function)
-    val applyFunction = new JAllWindowFunction[R, R, W] {
-      def apply(window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = {
-        cleanApply(window, input.asScala, out)
-      }
-    }
+    val cleanWindowFunction = clean(windowFunction)
+    
+    val folder = new ScalaFoldFunction[T, R](cleanFolder)
+    val applyFunction = new ScalaAllWindowFunction[R, R, W](cleanWindowFunction)
+    
     val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
     asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index bea578e..773829e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -25,14 +25,12 @@ import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.scala.function.WindowFunction
-import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction}
+import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper}
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-import scala.collection.JavaConverters._
-
 /**
  * A [[WindowedStream]] represents a data stream where elements are grouped by
  * key, and for each key, the stream of elements is split into windows based on a
@@ -124,9 +122,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       throw new NullPointerException("Reduce function must not be null.")
     }
     val cleanFun = clean(function)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
+    val reducer = new ScalaReduceFunction[T](cleanFun)
     reduce(reducer)
   }
 
@@ -163,11 +159,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       throw new NullPointerException("Fold function must not be null.")
     }
     val cleanFun = clean(function)
-    val folder = new FoldFunction[T,R] {
-      def fold(acc: R, v: T) = {
-        cleanFun(acc, v)
-      }
-    }
+    val folder = new ScalaFoldFunction[T, R](cleanFun)
     fold(initialValue, folder)
   }
 
@@ -186,13 +178,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       function: WindowFunction[T, R, K, W]): DataStream[R] = {
     
     val cleanFunction = clean(function)
-    val javaFunction = new JWindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = {
-        cleanFunction.apply(key, window, input.asScala, out)
-      }
-    }
-
-    asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]]))
+    val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanFunction)
+    asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
   }
 
   /**
@@ -213,11 +200,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     }
 
     val cleanedFunction = clean(function)
-    val applyFunction = new JWindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanedFunction(key, window, elements.asScala, out)
-      }
-    }
+    val applyFunction = new ScalaWindowFunction[T, R, K, W](cleanedFunction)
+    
     asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
   }
 
@@ -236,16 +220,13 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       preAggregator: ReduceFunction[T],
       function: WindowFunction[T, R, K, W]): DataStream[R] = {
 
-    val cleanedFunction = clean(function)
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(function)
 
-    val applyFunction = new JWindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanedFunction.apply(key, window, elements.asScala, out)
-      }
-    }
+    val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
 
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, resultType))
+    asScalaStream(javaStream.apply(cleanedPreAggregator, applyFunction, resultType))
   }
 
   /**
@@ -256,31 +237,25 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
    *
    * @param preAggregator The reduce function that is used for pre-aggregation
-   * @param function The window function.
+   * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */
   def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
-      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+      windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     
-    if (function == null) {
+    if (preAggregator == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
-    if (function == null) {
+    if (windowFunction == null) {
       throw new NullPointerException("WindowApply function must not be null.")
     }
 
     val cleanReducer = clean(preAggregator)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
-    }
-
-    val cleanApply = clean(function)
-    val applyFunction = new JWindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanApply(key, window, input.asScala, out)
-      }
-    }
+    val cleanWindowFunction = clean(windowFunction)
+    
+    val reducer = new ScalaReduceFunction[T](cleanReducer)
+    val applyFunction = new ScalaWindowFunction[T, R, K, W](cleanWindowFunction)
     
     asScalaStream(javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]))
   }
@@ -303,16 +278,13 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       function: WindowFunction[R, R, K, W]): DataStream[R] = {
 
     val cleanedFunction = clean(function)
+    val cleanedFoldFunction = clean(foldFunction)
 
-    val applyFunction = new JWindowFunction[R, R, K, W] {
-      def apply(key: K, window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = {
-        cleanedFunction.apply(key, window, elements.asScala, out)
-      }
-    }
+    val applyFunction = new ScalaWindowFunctionWrapper[R, R, K, W](cleanedFunction)
 
     asScalaStream(javaStream.apply(
       initialValue,
-      clean(foldFunction),
+      cleanedFoldFunction,
       applyFunction,
       implicitly[TypeInformation[R]]))
   }
@@ -325,32 +297,27 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * Arriving data is incrementally aggregated using the given fold function.
     *
     * @param foldFunction The fold function that is used for incremental aggregation
-    * @param function The window function.
+    * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
   def apply[R: TypeInformation](
       initialValue: R,
       foldFunction: (R, T) => R,
-      function: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
+      windowFunction: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
     
-    if (function == null) {
+    if (foldFunction == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    if (function == null) {
+    if (windowFunction == null) {
       throw new NullPointerException("WindowApply function must not be null.")
     }
 
     val cleanFolder = clean(foldFunction)
-    val folder = new FoldFunction[T, R] {
-      def fold(acc: R, v: T) = { cleanFolder(acc, v) }
-    }
-
-    val cleanApply = clean(function)
-    val applyFunction = new JWindowFunction[R, R, K, W] {
-      def apply(key: K, window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = {
-        cleanApply(key, window, input.asScala, out)
-      }
-    }
+    val cleanWindowFunction = clean(windowFunction)
+    
+    val folder = new ScalaFoldFunction[T, R](cleanFolder)
+    val applyFunction = new ScalaWindowFunction[R, R, K, W](cleanWindowFunction)
+    
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
     asScalaStream(javaStream.apply(initialValue, folder, applyFunction, resultType))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
index 4e77d83..7677459 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
@@ -25,11 +25,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
 /**
-  * Base interface for functions that are evaluated over keyed (grouped) windows.
-  *
-  * @tparam IN The type of the input value.
-  * @tparam OUT The type of the output value.
-  */
+ * Base interface for functions that are evaluated over non-grouped windows,
+ * i.e., windows over all stream partitions.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ */
 @Public
 trait AllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichAllWindowFunction.scala
new file mode 100644
index 0000000..4ff1fe9
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichAllWindowFunction.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import org.apache.flink.api.common.functions.AbstractRichFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+ * Rich variant of the [[org.apache.flink.streaming.api.scala.function.AllWindowFunction]].
+ *
+ * As a [[org.apache.flink.api.common.functions.RichFunction]], it gives access to the
+ * [[org.apache.flink.api.common.functions.RuntimeContext]] and provides setup
+ * and tear-down methods.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ * @tparam W The type of Window that this window function can be applied on.
+ */
+abstract class RichAllWindowFunction[IN, OUT, W <: Window]
+  extends AbstractRichFunction
+  with AllWindowFunction[IN, OUT, W] {}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichWindowFunction.scala
new file mode 100644
index 0000000..72d44e7
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichWindowFunction.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import org.apache.flink.api.common.functions.AbstractRichFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+ * Rich variant of the [[org.apache.flink.streaming.api.scala.function.WindowFunction]].
+ * 
+ * As a [[org.apache.flink.api.common.functions.RichFunction]], it gives access to the
+ * [[org.apache.flink.api.common.functions.RuntimeContext]] and provides setup
+ * and tear-down methods.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ * @tparam KEY The type of the key.
+ * @tparam W The type of Window that this window function can be applied on.
+ */
+abstract class RichWindowFunction[IN, OUT, KEY, W <: Window] 
+  extends AbstractRichFunction
+  with WindowFunction[IN, OUT, KEY, W] {}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunction.scala
new file mode 100644
index 0000000..943e83f
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunction.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction}
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+ * A wrapper function that exposes a Scala Function3 as a Java AllWindowFunction.
+ */
+final class ScalaAllWindowFunction[IN, OUT, W <: Window](
+        private[this] val function: (W, Iterable[IN], Collector[OUT]) => Unit)
+    extends JAllWindowFunction[IN, OUT, W] {
+  
+  @throws(classOf[Exception])
+  override def apply(window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) {
+    function.apply(window, input.asScala, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunctionWrapper.scala
new file mode 100644
index 0000000..39142c2
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunctionWrapper.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+ * A wrapper function that exposes a Scala WindowFunction as a JavaWindow function.
+ * 
+ * The Scala and Java Window functions differ in their type of "Iterable":
+ *   - Scala WindowFunction: scala.Iterable
+ *   - Java WindowFunction: java.lang.Iterable
+ */
+final class ScalaAllWindowFunctionWrapper[IN, OUT, W <: Window](
+        private[this] val func: AllWindowFunction[IN, OUT, W])
+    extends JAllWindowFunction[IN, OUT, W] with RichFunction {
+  
+  @throws(classOf[Exception])
+  override def apply(window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) {
+    func.apply(window, input.asScala, out)
+  }
+
+  @throws(classOf[Exception])
+  override def open(parameters: Configuration) {
+    FunctionUtils.openFunction(func, parameters)
+  }
+
+  @throws(classOf[Exception])
+  override def close() {
+    FunctionUtils.closeFunction(func)
+  }
+
+  override def setRuntimeContext(t: RuntimeContext) {
+    FunctionUtils.setFunctionRuntimeContext(func, t)
+  }
+
+  override def getRuntimeContext(): RuntimeContext = {
+    throw new RuntimeException("This should never be called")
+  }
+
+  override def getIterationRuntimeContext(): IterationRuntimeContext = {
+    throw new RuntimeException("This should never be called")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaFoldFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaFoldFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaFoldFunction.scala
new file mode 100644
index 0000000..f85ddd0
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaFoldFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.api.common.functions.FoldFunction
+
+/**
+ * A wrapper function that exposes a Scala Function2 as a [[FoldFunction]].
+ */
+final class ScalaFoldFunction[T, R](private[this] val foldFunction: (R, T) => R)
+    extends FoldFunction[T, R] {
+  
+  @throws(classOf[Exception])
+  override def fold(accumulator: R, value: T): R = {
+    foldFunction(accumulator, value)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaReduceFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaReduceFunction.scala
new file mode 100644
index 0000000..027ccb2
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaReduceFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.api.common.functions.ReduceFunction
+
+/**
+ * A wrapper function that exposes a Scala Function2 as a [[ReduceFunction]].
+ */
+final class ScalaReduceFunction[T](private[this] val function: (T, T) => T)
+    extends ReduceFunction[T] {
+  
+  @throws(classOf[Exception])
+  override def reduce(a: T, b: T): T = {
+    function(a, b)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunction.scala
new file mode 100644
index 0000000..3b4cb9d
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunction.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction}
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+ * A wrapper function that exposes a Scala Function4 as a Java WindowFunction.
+ */
+final class ScalaWindowFunction[IN, OUT, KEY, W <: Window](
+        private[this] val function: (KEY, W, Iterable[IN], Collector[OUT]) => Unit)
+    extends JWindowFunction[IN, OUT, KEY, W] {
+  
+  @throws(classOf[Exception])
+  override def apply(key: KEY, window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) {
+    function.apply(key, window, input.asScala, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunctionWrapper.scala
new file mode 100644
index 0000000..1d74b6c
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunctionWrapper.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function.util
+
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext, RichFunction}
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.{ WindowFunction => JWindowFunction }
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+ * A wrapper function that exposes a Scala WindowFunction as a JavaWindow function.
+ * 
+ * The Scala and Java Window functions differ in their type of "Iterable":
+ *   - Scala WindowFunction: scala.Iterable
+ *   - Java WindowFunction: java.lang.Iterable
+ */
+final class ScalaWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
+        private[this] val func: WindowFunction[IN, OUT, KEY, W])
+    extends JWindowFunction[IN, OUT, KEY, W] with RichFunction {
+  
+  @throws(classOf[Exception])
+  override def apply(key: KEY, window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) {
+    func.apply(key, window, input.asScala, out)
+  }
+
+  @throws(classOf[Exception])
+  override def open(parameters: Configuration) {
+    FunctionUtils.openFunction(func, parameters)
+  }
+
+  @throws(classOf[Exception])
+  override def close() {
+    FunctionUtils.closeFunction(func)
+  }
+
+  override def setRuntimeContext(t: RuntimeContext) {
+    FunctionUtils.setFunctionRuntimeContext(func, t)
+  }
+
+  override def getRuntimeContext(): RuntimeContext = {
+    throw new RuntimeException("This should never be called")
+  }
+
+  override def getIterationRuntimeContext(): IterationRuntimeContext = {
+    throw new RuntimeException("This should never be called")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index 7833651..6a6a956 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -20,13 +20,17 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.flink.api.common.functions.FoldFunction
+import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks}
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichWindowFunction, CheckingIdentityRichAllWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Test
 import org.junit.Assert._
@@ -87,9 +91,68 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  def testFoldAllWindow(): Unit = {
+  def testFoldWithWindowFunction(): Unit = {
     WindowFoldITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichWindowFunction.reset()
 
+    val foldFunc = new FoldFunction[(String, Int), (String, Int)] {
+      override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = {
+        (accumulator._1 + value._1, accumulator._2 + value._2)
+      }
+    }
+    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
+    
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply(
+        ("R:", 0),
+        foldFunc,
+        new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFoldITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(R:aaa,3)",
+      "(R:aaa,21)",
+      "(R:bbb,12)")
+
+    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+    CheckingIdentityRichWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
+  def testFoldAllWindow(): Unit = {
+    WindowFoldITCase.testResults = mutable.MutableList()
+    
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(1)
@@ -131,6 +194,62 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
   }
 
+  @Test
+  def testFoldAllWithWindowFunction(): Unit = {
+    WindowFoldITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichAllWindowFunction.reset()
+    
+    val foldFunc = new FoldFunction[(String, Int), (String, Int)] {
+      override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = {
+        (accumulator._1 + value._1, accumulator._2 + value._2)
+      }
+    }
+    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("a", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("a", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 5))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply(
+        ("R:", 0),
+        foldFunc,
+        new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFoldITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold All-Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(R:aaa,3)",
+      "(R:bababa,24)")
+
+    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+    CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
new file mode 100644
index 0000000..c38f422
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class WindowFunctionITCase {
+
+  @Test
+  def testRichWindowFunction(): Unit = {
+    WindowFunctionITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichWindowFunction.reset()
+    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {}
+      
+    }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply(new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFunctionITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("RichWindowFunction Test")
+
+    val expectedResult = mutable.MutableList(
+      "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+      "(b,3)", "(b,4)", "(b,5)")
+
+    assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
+
+    CheckingIdentityRichWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
+  def testRichAllWindowFunction(): Unit = {
+    WindowFunctionITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichAllWindowFunction.reset()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {}
+      
+    }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply(new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFunctionITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("RichAllWindowFunction Test")
+
+    val expectedResult = mutable.MutableList(
+      "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+      "(b,3)", "(b,4)", "(b,5)")
+
+    assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
+
+    CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
+  }
+}
+
+object WindowFunctionITCase {
+
+  private var testResults: mutable.MutableList[String] = null
+
+  private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String, Int)] {
+
+    private var currentTimestamp = -1L
+
+    override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long = {
+      currentTimestamp = element._2
+      currentTimestamp
+    }
+
+    def checkAndGetNextWatermark(
+        lastElement: (String, Int),
+        extractedTimestamp: Long): Watermark = {
+      new Watermark(lastElement._2 - 1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
new file mode 100644
index 0000000..ffd94fc
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.common.functions.{ReduceFunction, FoldFunction}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
+ * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
+ */
+class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testReduceWindow(): Unit = {
+    WindowReduceITCase.testResults = mutable.MutableList()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .reduce( (a, b) => (a._1 + b._1, a._2 + b._2) )
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowReduceITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Reduce Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(aaa,3)",
+      "(aaa,21)",
+      "(bbb,12)")
+
+    assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+  }
+
+  @Test
+  def testReduceWithWindowFunction(): Unit = {
+    WindowReduceITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichWindowFunction.reset()
+
+    val reduceFunc = new ReduceFunction[(String, Int)] {
+      override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
+        (a._1 + b._1, a._2 + b._2)
+      }
+    }
+    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+    
+    source1
+      .keyBy(0)
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply(
+        reduceFunc,
+        new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowReduceITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Reduce Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(aaa,3)",
+      "(aaa,21)",
+      "(bbb,12)")
+
+    assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+    CheckingIdentityRichWindowFunction.checkRichMethodCalls()
+  }
+
+  @Test
+  def testReduceAllWindow(): Unit = {
+    WindowReduceITCase.testResults = mutable.MutableList()
+    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("a", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("a", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 5))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .reduce( (a, b) => (a._1 + b._1, a._2 + b._2) )
+      .addSink(new SinkFunction[(String, Int)]() {
+      def invoke(value: (String, Int)) {
+        WindowReduceITCase.testResults += value.toString
+      }
+    })
+
+    env.execute("Fold All-Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(aaa,3)",
+      "(bababa,24)")
+
+    assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+  }
+
+  @Test
+  def testReduceAllWithWindowFunction(): Unit = {
+    WindowReduceITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichAllWindowFunction.reset()
+
+    val reduceFunc = new ReduceFunction[(String, Int)] {
+      override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
+        (a._1 + b._1, a._2 + b._2)
+      }
+    }
+    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("a", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("a", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 5))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply(
+        reduceFunc,
+        new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowReduceITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold All-Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(aaa,3)",
+      "(bababa,24)")
+
+    assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+    CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
+  }
+}
+
+object WindowReduceITCase {
+  
+  private var testResults: mutable.MutableList[String] = null
+
+  private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String, Int)] {
+
+    private var currentTimestamp = -1L
+
+    override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long = {
+      currentTimestamp = element._2
+      currentTimestamp
+    }
+
+    def checkAndGetNextWatermark(
+        lastElement: (String, Int),
+        extractedTimestamp: Long): Watermark = {
+      new Watermark(lastElement._2 - 1)
+    }
+  }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichAllWindowFunction.scala
new file mode 100644
index 0000000..acd8b12
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichAllWindowFunction.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.function.RichAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichAllWindowFunction[T, W <: Window]
+  extends RichAllWindowFunction[T, T, W] {
+  
+  override def apply(window: W, input: scala.Iterable[T], out: Collector[T]): Unit = {
+    for (value <- input) {
+      out.collect(value)
+    }
+  }
+  
+  override def open(conf: Configuration): Unit = {
+    super.open(conf)
+    CheckingIdentityRichAllWindowFunction.openCalled = true
+  }
+
+  override def close(): Unit = {
+    super.close()
+    CheckingIdentityRichAllWindowFunction.closeCalled = true
+  }
+
+  override def setRuntimeContext(context: RuntimeContext): Unit = {
+    super.setRuntimeContext(context)
+    CheckingIdentityRichAllWindowFunction.contextSet = true
+  }
+}
+
+
+object CheckingIdentityRichAllWindowFunction {
+
+  @volatile
+  private[CheckingIdentityRichAllWindowFunction] var closeCalled = false
+
+  @volatile
+  private[CheckingIdentityRichAllWindowFunction] var openCalled = false
+
+  @volatile
+  private[CheckingIdentityRichAllWindowFunction] var contextSet = false
+
+  def reset(): Unit = {
+    closeCalled = false
+    openCalled = false
+    contextSet = false
+  }
+
+  def checkRichMethodCalls(): Unit = {
+    if (!contextSet) {
+      throw new AssertionError("context not set")
+    }
+    if (!openCalled) {
+      throw new AssertionError("open() not called")
+    }
+    if (!closeCalled) {
+      throw new AssertionError("close() not called")
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7bdac1af/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichWindowFunction.scala
new file mode 100644
index 0000000..e0de0de
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.function.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichWindowFunction[T, K, W <: Window]
+  extends RichWindowFunction[T, T, K, W] {
+  
+  override def apply(key: K, window: W, input: scala.Iterable[T], out: Collector[T]): Unit = {
+    for (value <- input) {
+      out.collect(value)
+    }
+  }
+  
+  override def open(conf: Configuration): Unit = {
+    super.open(conf)
+    CheckingIdentityRichWindowFunction.openCalled = true
+  }
+
+  override def close(): Unit = {
+    super.close()
+    CheckingIdentityRichWindowFunction.closeCalled = true
+  }
+
+  override def setRuntimeContext(context: RuntimeContext): Unit = {
+    super.setRuntimeContext(context)
+    CheckingIdentityRichWindowFunction.contextSet = true
+  }
+}
+
+object CheckingIdentityRichWindowFunction {
+  
+  @volatile
+  private[CheckingIdentityRichWindowFunction] var closeCalled = false
+
+  @volatile
+  private[CheckingIdentityRichWindowFunction] var openCalled = false
+
+  @volatile
+  private[CheckingIdentityRichWindowFunction] var contextSet = false
+  
+  def reset(): Unit = {
+    closeCalled = false
+    openCalled = false
+    contextSet = false
+  }
+  
+  def checkRichMethodCalls(): Unit = {
+    if (!contextSet) {
+      throw new AssertionError("context not set")
+    }
+    if (!openCalled) {
+      throw new AssertionError("open() not called")
+    }
+    if (!closeCalled) {
+      throw new AssertionError("close() not called")
+    }
+  }
+}


Mime
View raw message