flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [03/13] flink git commit: Add Scala API for new Windowing
Date Mon, 05 Oct 2015 14:42:37 GMT
Add Scala API for new Windowing

This adds window/timeWindow to KeyedStream along with windowAll/timeWindowAll
on DataStream.

The added API classes are AllWindowedStream and WindowedStream.

This also adds Translations tests similar to those for the Java API:
 - AllWindowTranslationTest.scala
 - WindowTranslationTest.scala


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0c9e78f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0c9e78f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0c9e78f

Branch: refs/heads/master
Commit: d0c9e78fdbcfe60db4bcfc9e6c2d4ba70fa00935
Parents: 9baadfe
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Oct 1 21:23:56 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |   4 +
 .../api/datastream/WindowedStream.java          |   5 +
 .../streaming/api/scala/AllWindowedStream.scala | 123 ++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  |  84 +++++++-
 .../flink/streaming/api/scala/KeyedStream.scala |  91 ++++++++-
 .../streaming/api/scala/WindowedStream.scala    | 126 ++++++++++++
 .../api/scala/AllWindowTranslationTest.scala    | 192 +++++++++++++++++++
 .../StreamingScalaAPICompletenessTest.scala     |  24 +++
 .../api/scala/WindowTranslationTest.scala       | 185 ++++++++++++++++++
 9 files changed, 830 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index e5c7c18..134029f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -228,4 +229,7 @@ public class AllWindowedStream<T, W extends Window> {
 		return null;
 	}
 
+	public StreamExecutionEnvironment getExecutionEnvironment() {
+		return input.getExecutionEnvironment();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 16898dd..41adab5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -299,4 +300,8 @@ public class WindowedStream<T, K, W extends Window> {
 
 		return null;
 	}
+
+	public StreamExecutionEnvironment getExecutionEnvironment() {
+		return input.getExecutionEnvironment();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
new file mode 100644
index 0000000..4f36722
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+import scala.reflect.ClassTag
+
+/**
+ * A [[AllWindowedStream]] represents a data stream where the stream of
+ * elements is split into windows based on a
+ * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission
+ * is triggered based on a [[Trigger]].
+ *
+ * If an [[Evictor]] is specified it will be
+ * used to evict elements from the window after
+ * evaluation was triggered by the [[Trigger]] but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * Note that the [[AllWindowedStream()]] is purely and API construct, during runtime
+ * the [[AllWindowedStream()]] will be collapsed together with the
+ * operation over the window into one single operation.
+ *
+ * @tparam T The type of elements in the stream.
+ * @tparam W The type of [[Window]] that the
+ *           [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]
+ *           assigns the elements to.
+ */
+class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
+
+  /**
+   * Sets the [[Trigger]] that should be used to trigger window emission.
+   */
+  def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W] = {
+    javaStream.trigger(trigger)
+    this
+  }
+
+  /**
+   * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
+   *
+   * Note: When using an evictor window performance will degrade significantly, since
+   * pre-aggregation of window results cannot be used.
+   */
+  def evictor(evictor: Evictor[_ >: T, _ >: W]): AllWindowedStream[T, W] = {
+    javaStream.evictor(evictor)
+    this
+  }
+
+  // ------------------------------------------------------------------------
+  //  Operations on the keyed windows
+  // ------------------------------------------------------------------------
+  /**
+   * Applies a reduce function to the window. The window function is called for each evaluation
+   * of the window for each key individually. The output of the reduce function is interpreted
+   * as a regular non-windowed stream.
+   *
+   * This window will try and pre-aggregate data as much as the window policies permit. For example,
+   * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+   * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
+   * interval, so a few elements are stored per key (one per slide interval).
+   * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+   * aggregation tree.
+   *
+   * @param function The reduce function.
+   * @return The data stream that is the result of applying the reduce function to the window.
+   */
+  def reduceWindow(function: ReduceFunction[T]): DataStream[T] = {
+    javaStream.reduceWindow(clean(function))
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Not that this function requires that all data in the windows is buffered until the window
+   * is evaluated, as the function provides no means of pre-aggregation.
+   *
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](function: AllWindowFunction[T, R, W]): DataStream[R] = {
+    javaStream.apply(clean(function), implicitly[TypeInformation[R]])
+  }
+
+
+  // ------------------------------------------------------------------------
+  //  Utilities
+  // ------------------------------------------------------------------------
+
+  /**
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+   */
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 07828db..7dfaeef 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -19,6 +19,10 @@
 package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime}
+import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -31,7 +35,7 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator, KeyedDataStream}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
@@ -610,6 +614,82 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream.every(windowingHelper)
 
   /**
+   * Windows this DataStream into tumbling time windows.
+   *
+   * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
+   * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
+   * set using
+   * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
+   *
+   * @param size The size of the window.
+   */
+  def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
+    val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
+    val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
+
+    actualSize match {
+      case t: EventTime =>
+        val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds)
+          .asInstanceOf[WindowAssigner[T, TimeWindow]]
+        windowAll(assigner)
+      case t: ProcessingTime =>
+        val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds)
+          .asInstanceOf[WindowAssigner[T, TimeWindow]]
+        windowAll(assigner)
+      case _ => throw new RuntimeException("Invalid time: " + actualSize)
+    }
+  }
+
+  /**
+   * Windows this DataStream into sliding time windows.
+   *
+   * This is a shortcut for either `.window(SlidingTimeWindows.of(size, slide))` or
+   * `.window(SlidingProcessingTimeWindows.of(size, slide))` depending on the time characteristic
+   * set using
+   * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
+   *
+   * @param size The size of the window.
+   */
+  def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = {
+    val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
+    val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
+    val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
+
+    actualSize match {
+      case t: EventTime =>
+        val assigner = SlidingTimeWindows.of(
+          actualSize.toMilliseconds,
+          actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+        windowAll(assigner)
+      case t: ProcessingTime =>
+        val assigner = SlidingProcessingTimeWindows.of(
+          actualSize.toMilliseconds,
+          actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+        windowAll(assigner)
+      case _ => throw new RuntimeException("Invalid time: " + actualSize)
+    }
+  }
+
+  /**
+   * Windows this data stream to a [[AllWindowedStream]], which evaluates windows
+   * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The grouping
+   * of elements is done both by key and by window.
+   *
+   * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify
+   * when windows are evaluated. However, `WindowAssigner` have a default `Trigger`
+   * that is used if a `Trigger` is not specified.
+   *
+   * Note: This operation can be inherently non-parallel since all elements have to pass through
+   * the same operator instance. (Only for special cases, such as aligned time windows is
+   * it possible to perform this operation in parallel).
+   *
+   * @param assigner The `WindowAssigner` that assigns elements to windows.
+   * @return The trigger windows data stream.
+   */
+  def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = {
+    new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](javaStream, assigner))
+  }
+  /**
    * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
    * The internal timestamps are, for example, used to to event-time window operations.
    *
@@ -780,7 +860,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
    */
   private[flink] def clean[F <: AnyRef](f: F): F = {
     new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 25244cd..232e4bb 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -18,11 +18,14 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream, DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{KeyedStream => KeyedJavaStream, DataStream => JavaStream, WindowedStream => WindowedJavaStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime}
+import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.FoldFunction
@@ -30,7 +33,91 @@ import org.apache.flink.api.common.functions.ReduceFunction
 
 
 class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
- 
+
+  // ------------------------------------------------------------------------
+  //  Windowing
+  // ------------------------------------------------------------------------
+
+  /**
+   * Windows this [[KeyedStream]] into tumbling time windows.
+   *
+   * <p>
+   * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
+   * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
+   * set using
+   * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]]
+   *
+   * @param size The size of the window.
+   */
+  def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = {
+    val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
+    val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
+
+    actualSize match {
+      case t: EventTime =>
+        val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds)
+          .asInstanceOf[WindowAssigner[T, TimeWindow]]
+        window(assigner)
+      case t: ProcessingTime =>
+        val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds)
+          .asInstanceOf[WindowAssigner[T, TimeWindow]]
+        window(assigner)
+      case _ => throw new RuntimeException("Invalid time: " + actualSize)
+    }
+  }
+
+  /**
+   * Windows this [[KeyedStream]] into sliding time windows.
+   *
+   * <p>
+   * This is a shortcut for either `.window(SlidingTimeWindows.of(size))` or
+   * `.window(SlidingProcessingTimeWindows.of(size))` depending on the time characteristic
+   * set using
+   * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]]
+   *
+   * @param size The size of the window.
+   */
+  def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow] = {
+    val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
+    val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
+    val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
+
+    actualSize match {
+      case t: EventTime =>
+        val assigner = SlidingTimeWindows.of(
+          actualSize.toMilliseconds,
+          actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+        window(assigner)
+      case t: ProcessingTime =>
+        val assigner = SlidingProcessingTimeWindows.of(
+          actualSize.toMilliseconds,
+          actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+        window(assigner)
+      case _ => throw new RuntimeException("Invalid time: " + actualSize)
+    }
+  }
+
+  /**
+   * Windows this data stream to a [[WindowedStream]], which evaluates windows
+   * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The
+   * grouping of elements is done both by key and by window.
+   *
+   * <p>
+   * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify
+   * when windows are evaluated. However, `WindowAssigner` have a default `Trigger`
+   * that is used if a `Trigger` is not specified.
+   *
+   * @param assigner The `WindowAssigner` that assigns elements to windows.
+   * @return The trigger windows data stream.
+   */
+  def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = {
+    new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner))
+  }
+
+  // ------------------------------------------------------------------------
+  //  Non-Windowed aggregation operations
+  // ------------------------------------------------------------------------
+
   /**
    * Creates a new [[DataStream]] by reducing the elements of this DataStream
    * using an associative reduce function. An independent aggregate is kept per key.

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
new file mode 100644
index 0000000..a688846
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+import scala.reflect.ClassTag
+
+/**
+ * A [[WindowedStream]] represents a data stream where elements are grouped by
+ * key, and for each key, the stream of elements is split into windows based on a
+ * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission
+ * is triggered based on a [[Trigger]].
+ *
+ * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
+ * different points for each key.
+ *
+ * If an [[org.apache.flink.streaming.api.windowing.evictors.Evictor]] is specified it will
+ * be used to evict elements from the window after evaluation was triggered by the [[Trigger]]
+ * but before the actual evaluation of the window. When using an evictor window performance will
+ * degrade significantly, since pre-aggregation of window results cannot be used.
+ *
+ * Note that the [[WindowedStream]] is purely and API construct, during runtime
+ * the [[WindowedStream]] will be collapsed together with the
+ * [[KeyedStream]] and the operation over the window into one single operation.
+ *
+ * @tparam T The type of elements in the stream.
+ * @tparam K The type of the key by which elements are grouped.
+ * @tparam W The type of [[Window]] that the
+ *           [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]
+ *           assigns the elements to.
+ */
+class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
+
+  /**
+   * Sets the [[Trigger]] that should be used to trigger window emission.
+   */
+  def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W] = {
+    javaStream.trigger(trigger)
+    this
+  }
+
+  /**
+   * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
+   *
+   * Note: When using an evictor window performance will degrade significantly, since
+   * pre-aggregation of window results cannot be used.
+   */
+  def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W] = {
+    javaStream.evictor(evictor)
+    this
+  }
+
+  // ------------------------------------------------------------------------
+  //  Operations on the keyed windows
+  // ------------------------------------------------------------------------
+  /**
+   * Applies a reduce function to the window. The window function is called for each evaluation
+   * of the window for each key individually. The output of the reduce function is interpreted
+   * as a regular non-windowed stream.
+   *
+   * This window will try and pre-aggregate data as much as the window policies permit. For example,
+   * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+   * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
+   * interval, so a few elements are stored per key (one per slide interval).
+   * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+   * aggregation tree.
+   *
+   * @param function The reduce function.
+   * @return The data stream that is the result of applying the reduce function to the window.
+   */
+  def reduceWindow(function: ReduceFunction[T]): DataStream[T] = {
+    javaStream.reduceWindow(clean(function))
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Not that this function requires that all data in the windows is buffered until the window
+   * is evaluated, as the function provides no means of pre-aggregation.
+   *
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](function: WindowFunction[T, R, K, W]): DataStream[R] = {
+    javaStream.apply(clean(function), implicitly[TypeInformation[R]])
+  }
+
+
+  // ------------------------------------------------------------------------
+  //  Utilities
+  // ------------------------------------------------------------------------
+
+  /**
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+   */
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
new file mode 100644
index 0000000..35c7fcc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+
+import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
+import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
+import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, WatermarkTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
+import org.apache.flink.streaming.runtime.operators.windowing._
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.util.Collector
+
+import org.junit.Assert._
+import org.junit.{Ignore, Test}
+
+class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+  /**
+   * These tests ensure that the fast aligned time windows operator is used if the
+   * conditions are right.
+   *
+   * TODO: update once we have optimized aligned time windows operator for all-windows
+   */
+  @Ignore
+  @Test
+  def testFastTimeWindows(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+      .reduceWindow(reducer)
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
+
+    val window2 = source
+      .keyBy(0)
+      .window(SlidingProcessingTimeWindows.of(1000, 100))
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+      def apply(
+                    tuple: Tuple,
+                    window: TimeWindow,
+                    values: java.lang.Iterable[(String, Int)],
+                    out: Collector[(String, Int)]) { }
+    })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
+  }
+
+  @Test
+  def testNonEvicting(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+      .trigger(CountTrigger.of(100))
+      .reduceWindow(reducer)
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
+    val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
+    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(
+      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .windowAll(TumblingProcessingTimeWindows.of(1000))
+      .trigger(CountTrigger.of(100))
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
+      def apply(
+                    window: TimeWindow,
+                    values: java.lang.Iterable[(String, Int)],
+                    out: Collector[(String, Int)]) { }
+    })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
+    val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
+    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+  }
+
+  @Test
+  def testEvicting(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+      .evictor(TimeEvictor.of(1000))
+      .reduceWindow(reducer)
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
+    val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
+    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .windowAll(TumblingProcessingTimeWindows.of(1000))
+      .trigger(CountTrigger.of(100))
+      .evictor(CountEvictor.of(1000))
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
+      def apply(
+                    window: TimeWindow,
+                    values: java.lang.Iterable[(String, Int)],
+                    out: Collector[(String, Int)]) { }
+    })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
+    val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
+    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+  }
+}
+
+// ------------------------------------------------------------------------
+//  UDFs
+// ------------------------------------------------------------------------
+
+class DummyReducer extends RichReduceFunction[(String, Int)] {
+  def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
+    value1
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 66fe197..6ecdb85 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -44,6 +44,9 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.datastream.DataStream.getTransformation",
       "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getFirstInput",
+      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getSecondInput",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
@@ -51,6 +54,12 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
       "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
 
+      "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment",
+
+      "org.apache.flink.streaming.api.datastream.KeyedStream.transform",
+      "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",
+
       "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled",
       "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." +
         "getStateHandleProvider",
@@ -114,6 +123,21 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       classOf[SplitDataStream[_]])
 
     checkMethods(
+      "WindowedStream", "WindowedStream",
+      classOf[org.apache.flink.streaming.api.datastream.WindowedStream[_, _, _]],
+      classOf[WindowedStream[_, _, _]])
+
+    checkMethods(
+      "AllWindowedStream", "AllWindowedStream",
+      classOf[org.apache.flink.streaming.api.datastream.AllWindowedStream[_, _]],
+      classOf[AllWindowedStream[_, _]])
+
+    checkMethods(
+      "KeyedStream", "KeyedStream",
+      classOf[org.apache.flink.streaming.api.datastream.KeyedStream[_, _]],
+      classOf[KeyedStream[_, _]])
+
+    checkMethods(
       "StreamJoinOperator", "StreamJoinOperator",
       classOf[org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator[_,_]],
       classOf[StreamJoinOperator[_,_]])

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c9e78f/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
new file mode 100644
index 0000000..49d0a1a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
+import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
+import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
+import org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, WindowOperator, AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator}
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.util.Collector
+
+import org.junit.Assert._
+import org.junit.Test
+
+class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+  /**
+   * These tests ensure that the fast aligned time windows operator is used if the
+   * conditions are right.
+   */
+  @Test
+  def testFastTimeWindows(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingProcessingTimeWindows.of(1000, 100))
+      .reduceWindow(reducer)
+
+    val transform1 = window1.getJavaStream.getTransformation
+        .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+    
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
+
+    val window2 = source
+      .keyBy(0)
+      .window(SlidingProcessingTimeWindows.of(1000, 100))
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+            key: Tuple,
+            window: TimeWindow,
+            values: java.lang.Iterable[(String, Int)],
+            out: Collector[(String, Int)]) { }
+      })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
+  }
+
+  @Test
+  def testNonEvicting(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingProcessingTimeWindows.of(1000, 100))
+      .trigger(CountTrigger.of(100))
+      .reduceWindow(reducer)
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(
+      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .keyBy(0)
+      .window(TumblingProcessingTimeWindows.of(1000))
+      .trigger(CountTrigger.of(100))
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+      def apply(
+                    tuple: Tuple,
+                    window: TimeWindow,
+                    values: java.lang.Iterable[(String, Int)],
+                    out: Collector[(String, Int)]) { }
+    })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+  }
+
+  @Test
+  def testEvicting(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingProcessingTimeWindows.of(1000, 100))
+      .evictor(TimeEvictor.of(1000))
+      .reduceWindow(reducer)
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
+    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .keyBy(0)
+      .window(TumblingProcessingTimeWindows.of(1000))
+      .trigger(CountTrigger.of(100))
+      .evictor(CountEvictor.of(1000))
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+      def apply(
+                    tuple: Tuple,
+                    window: TimeWindow,
+                    values: java.lang.Iterable[(String, Int)],
+                    out: Collector[(String, Int)]) { }
+    })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
+    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+  }
+}


Mime
View raw message