flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [01/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API
Date Fri, 09 Oct 2015 10:16:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0ee0c1f55 -> 0c1141abc


http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
index 7efb006..bf63695 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
@@ -122,31 +122,31 @@ public class TopSpeedWindowingExampleData {
 					"(0,100,2005.5555555555566,1424952017667)\n" + "(1,95,2211.1111111111118,1424952017668)";
 
 	public static final String TOP_SPEEDS =
-					"(0,55,15.277777777777777,1424951918630)\n" +
+			"(0,55,15.277777777777777,1424951918630)\n" +
 					"(1,50,26.38888888888889,1424951919632)\n" +
 					"(0,65,108.33333333333333,1424951924635)\n" +
 					"(1,50,26.38888888888889,1424951919632)\n" +
 					"(0,65,108.33333333333333,1424951924635)\n" +
-					"(1,55,159.7222222222222,1424951930637)\n" +
+					"(1,65,194.4444444444444,1424951932638)\n" +
 					"(0,65,108.33333333333333,1424951924635)\n" +
 					"(1,70,213.88888888888886,1424951933638)\n" +
-					"(0,60,156.94444444444443,1424951927636)\n" +
+					"(0,60,218.05555555555551,1424951931637)\n" +
 					"(1,75,272.2222222222222,1424951936639)\n" +
+					"(0,55,233.3333333333333,1424951932637)\n" +
 					"(1,75,272.2222222222222,1424951936639)\n" +
-					"(0,60,218.05555555555551,1424951931637)\n" +
 					"(1,75,272.2222222222222,1424951936639)\n" +
 					"(0,55,288.88888888888886,1424951936639)\n" +
-					"(1,75,272.2222222222222,1424951936639)\n" +
-					"(1,75,497.2222222222222,1424951948643)\n" +
-					"(0,55,344.44444444444446,1424951940640)\n" +
+					"(1,70,329.16666666666663,1424951939640)\n" +
+					"(0,55,373.61111111111114,1424951942641)\n" +
 					"(1,80,519.4444444444443,1424951949644)\n" +
-					"(0,50,413.88888888888897,1424951945642)\n" +
 					"(1,85,586.111111111111,1424951952645)\n" +
+					"(0,50,487.50000000000006,1424951951644)\n" +
 					"(1,85,586.111111111111,1424951952645)\n" +
-					"(0,55,573.6111111111112,1424951958646)\n" +
+					"(0,60,590.2777777777778,1424951959647)\n" +
 					"(1,85,586.111111111111,1424951952645)\n" +
-					"(0,70,627.7777777777778,1424951961647)\n" +
-					"(1,90,831.9444444444441,1424951963648)\n" +
+					"(0,75,648.6111111111112,1424951962648)\n" +
+					"(1,85,715.2777777777776,1424951958647)\n" +
+					"(1,95,858.333333333333,1424951964649)\n" +
 					"(0,80,670.8333333333334,1424951963648)\n" +
 					"(1,95,858.333333333333,1424951964649)\n" +
 					"(0,80,670.8333333333334,1424951963648)\n" +
@@ -158,22 +158,22 @@ public class TopSpeedWindowingExampleData {
 					"(1,100,937.4999999999998,1424951967650)\n" +
 					"(1,100,937.4999999999998,1424951967650)\n" +
 					"(0,85,861.1111111111112,1424951972651)\n" +
-					"(1,100,965.2777777777776,1424951968650)\n" +
+					"(1,100,993.0555555555554,1424951969650)\n" +
 					"(0,85,861.1111111111112,1424951972651)\n" +
-					"(1,100,1020.8333333333333,1424951970651)\n" +
-					"(1,100,1076.388888888889,1424951972651)\n" +
-					"(0,90,1058.3333333333335,1424951981654)\n" +
+					"(1,100,1048.611111111111,1424951971651)\n" +
 					"(1,100,1130.5555555555557,1424951974652)\n" +
-					"(0,95,1133.3333333333335,1424951984655)\n" +
-					"(1,100,1186.1111111111113,1424951976653)\n" +
+					"(0,90,1058.3333333333335,1424951981654)\n" +
+					"(1,100,1158.3333333333335,1424951975652)\n" +
 					"(0,95,1133.3333333333335,1424951984655)\n" +
 					"(1,100,1240.277777777778,1424951978653)\n" +
 					"(0,95,1133.3333333333335,1424951984655)\n" +
-					"(1,100,1295.8333333333337,1424951980654)\n" +
+					"(1,100,1268.0555555555559,1424951979654)\n" +
+					"(0,95,1133.3333333333335,1424951984655)\n" +
+					"(1,100,1323.6111111111115,1424951981654)\n" +
 					"(0,95,1133.3333333333335,1424951984655)\n" +
-					"(1,100,1351.3888888888894,1424951982655)\n" +
+					"(1,100,1379.1666666666672,1424951983655)\n" +
 					"(0,100,1358.3333333333335,1424951993659)\n" +
-					"(1,100,1406.944444444445,1424951984656)\n" +
+					"(1,100,1563.8888888888896,1424951990658)\n" +
 					"(0,100,1358.3333333333335,1424951993659)\n" +
 					"(1,100,1563.8888888888896,1424951990658)\n" +
 					"(0,100,1358.3333333333335,1424951993659)\n" +
@@ -181,47 +181,47 @@ public class TopSpeedWindowingExampleData {
 					"(0,100,1358.3333333333335,1424951993659)\n" +
 					"(0,100,1358.3333333333335,1424951993659)\n" +
 					"(1,100,1669.4444444444453,1424951994659)\n" +
-					"(0,100,1386.1111111111113,1424951994659)\n" +
-					"(1,95,1695.8333333333342,1424951995660)\n" +
 					"(0,100,1440.277777777778,1424951996660)\n" +
-					"(1,90,1947.2222222222226,1424952006664)\n" +
+					"(1,90,1720.8333333333342,1424951996660)\n" +
+					"(0,100,1468.0555555555559,1424951997660)\n" +
+					"(1,95,1973.6111111111115,1424952007664)\n" +
 					"(0,100,1522.2222222222226,1424951999661)\n" +
 					"(0,100,1627.7777777777783,1424952003662)\n" +
 					"(1,95,1973.6111111111115,1424952007664)\n" +
 					"(0,100,1627.7777777777783,1424952003662)\n" +
 					"(1,95,1973.6111111111115,1424952007664)\n" +
-					"(0,100,1655.555555555556,1424952004663)\n" +
 					"(0,100,1709.7222222222229,1424952006663)\n" +
+					"(0,100,1737.5000000000007,1424952007664)\n" +
 					"(1,95,1973.6111111111115,1424952007664)\n" +
 					"(0,100,1791.6666666666674,1424952009664)\n" +
 					"(1,95,2211.1111111111118,1424952017668)\n";
 
 	public static final String TOP_CASE_CLASS_SPEEDS =
-					"CarEvent(0,55,15.277777777777777,1424951918630)\n" +
+			"CarEvent(0,55,15.277777777777777,1424951918630)\n" +
 					"CarEvent(1,50,26.38888888888889,1424951919632)\n" +
 					"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
 					"CarEvent(1,50,26.38888888888889,1424951919632)\n" +
 					"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
-					"CarEvent(1,55,159.7222222222222,1424951930637)\n" +
+					"CarEvent(1,65,194.4444444444444,1424951932638)\n" +
 					"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
 					"CarEvent(1,70,213.88888888888886,1424951933638)\n" +
-					"CarEvent(0,60,156.94444444444443,1424951927636)\n" +
+					"CarEvent(0,60,218.05555555555551,1424951931637)\n" +
 					"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
+					"CarEvent(0,55,233.3333333333333,1424951932637)\n" +
 					"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
-					"CarEvent(0,60,218.05555555555551,1424951931637)\n" +
 					"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
 					"CarEvent(0,55,288.88888888888886,1424951936639)\n" +
-					"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
-					"CarEvent(1,75,497.2222222222222,1424951948643)\n" +
-					"CarEvent(0,55,344.44444444444446,1424951940640)\n" +
+					"CarEvent(1,70,329.16666666666663,1424951939640)\n" +
+					"CarEvent(0,55,373.61111111111114,1424951942641)\n" +
 					"CarEvent(1,80,519.4444444444443,1424951949644)\n" +
-					"CarEvent(0,50,413.88888888888897,1424951945642)\n" +
 					"CarEvent(1,85,586.111111111111,1424951952645)\n" +
+					"CarEvent(0,50,487.50000000000006,1424951951644)\n" +
 					"CarEvent(1,85,586.111111111111,1424951952645)\n" +
-					"CarEvent(0,55,573.6111111111112,1424951958646)\n" +
+					"CarEvent(0,60,590.2777777777778,1424951959647)\n" +
 					"CarEvent(1,85,586.111111111111,1424951952645)\n" +
-					"CarEvent(0,70,627.7777777777778,1424951961647)\n" +
-					"CarEvent(1,90,831.9444444444441,1424951963648)\n" +
+					"CarEvent(0,75,648.6111111111112,1424951962648)\n" +
+					"CarEvent(1,85,715.2777777777776,1424951958647)\n" +
+					"CarEvent(1,95,858.333333333333,1424951964649)\n" +
 					"CarEvent(0,80,670.8333333333334,1424951963648)\n" +
 					"CarEvent(1,95,858.333333333333,1424951964649)\n" +
 					"CarEvent(0,80,670.8333333333334,1424951963648)\n" +
@@ -233,22 +233,22 @@ public class TopSpeedWindowingExampleData {
 					"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
 					"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
 					"CarEvent(0,85,861.1111111111112,1424951972651)\n" +
-					"CarEvent(1,100,965.2777777777776,1424951968650)\n" +
+					"CarEvent(1,100,993.0555555555554,1424951969650)\n" +
 					"CarEvent(0,85,861.1111111111112,1424951972651)\n" +
-					"CarEvent(1,100,1020.8333333333333,1424951970651)\n" +
-					"CarEvent(1,100,1076.388888888889,1424951972651)\n" +
-					"CarEvent(0,90,1058.3333333333335,1424951981654)\n" +
+					"CarEvent(1,100,1048.611111111111,1424951971651)\n" +
 					"CarEvent(1,100,1130.5555555555557,1424951974652)\n" +
-					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
-					"CarEvent(1,100,1186.1111111111113,1424951976653)\n" +
+					"CarEvent(0,90,1058.3333333333335,1424951981654)\n" +
+					"CarEvent(1,100,1158.3333333333335,1424951975652)\n" +
 					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
 					"CarEvent(1,100,1240.277777777778,1424951978653)\n" +
 					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
-					"CarEvent(1,100,1295.8333333333337,1424951980654)\n" +
+					"CarEvent(1,100,1268.0555555555559,1424951979654)\n" +
+					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+					"CarEvent(1,100,1323.6111111111115,1424951981654)\n" +
 					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
-					"CarEvent(1,100,1351.3888888888894,1424951982655)\n" +
+					"CarEvent(1,100,1379.1666666666672,1424951983655)\n" +
 					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
-					"CarEvent(1,100,1406.944444444445,1424951984656)\n" +
+					"CarEvent(1,100,1563.8888888888896,1424951990658)\n" +
 					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
 					"CarEvent(1,100,1563.8888888888896,1424951990658)\n" +
 					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
@@ -256,17 +256,17 @@ public class TopSpeedWindowingExampleData {
 					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
 					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
 					"CarEvent(1,100,1669.4444444444453,1424951994659)\n" +
-					"CarEvent(0,100,1386.1111111111113,1424951994659)\n" +
-					"CarEvent(1,95,1695.8333333333342,1424951995660)\n" +
 					"CarEvent(0,100,1440.277777777778,1424951996660)\n" +
-					"CarEvent(1,90,1947.2222222222226,1424952006664)\n" +
+					"CarEvent(1,90,1720.8333333333342,1424951996660)\n" +
+					"CarEvent(0,100,1468.0555555555559,1424951997660)\n" +
+					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
 					"CarEvent(0,100,1522.2222222222226,1424951999661)\n" +
 					"CarEvent(0,100,1627.7777777777783,1424952003662)\n" +
 					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
 					"CarEvent(0,100,1627.7777777777783,1424952003662)\n" +
 					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
-					"CarEvent(0,100,1655.555555555556,1424952004663)\n" +
 					"CarEvent(0,100,1709.7222222222229,1424952006663)\n" +
+					"CarEvent(0,100,1737.5000000000007,1424952007664)\n" +
 					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
 					"CarEvent(0,100,1791.6666666666674,1424952009664)\n" +
 					"CarEvent(1,95,2211.1111111111118,1424952017668)\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 8e3c7d6..1419afd 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -19,8 +19,15 @@
 package org.apache.flink.streaming.scala.examples.windowing
 
 
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.windowing.{Delta, Time}
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
 
 import scala.Stream._
 import scala.math._
@@ -53,16 +60,23 @@ object TopSpeedWindowing {
     }
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
 
     val cars = setCarsInput(env)
 
-    val topSeed = cars.keyBy("carId")
-      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
-      .every(Delta.of[CarEvent](triggerMeters,
-          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
-      .local    
+    val topSeed = cars
+      .extractAscendingTimestamp( _.time )
+      .keyBy("carId")
+      .window(GlobalWindows.create)
+      .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
+      .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
+        def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
+      }))
+//      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
+//      .every(Delta.of[CarEvent](triggerMeters,
+//          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
       .maxBy("speed")
-      .flatten()
 
     if (fileOutput) {
       topSeed.writeAsText(outputPath)

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 65cafb7..33104ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -27,9 +27,12 @@ import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
+import scala.collection.JavaConverters._
+
 /**
  * A [[AllWindowedStream]] represents a data stream where the stream of
  * elements is split into windows based on a
@@ -177,6 +180,28 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     javaStream.apply(clean(function), implicitly[TypeInformation[R]])
   }
 
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Not that this function requires that all data in the windows is buffered until the window
+   * is evaluated, as the function provides no means of pre-aggregation.
+   *
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    val cleanedFunction = clean(function)
+    val applyFunction = new AllWindowFunction[T, R, W] {
+      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanedFunction(window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fb4d75d..22abbdf 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -32,8 +32,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
-import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
 import org.apache.flink.streaming.api.windowing.time.AbstractTime
 import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
@@ -105,8 +103,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    */
   def name(name: String) : DataStream[T] = javaStream match {
     case stream : SingleOutputStreamOperator[T,_] => stream.name(name)
-    case _ => throw new
-        UnsupportedOperationException("Only supported for operators.")
+    case _ => throw new UnsupportedOperationException("Only supported for operators.")
     this
   }
   
@@ -583,37 +580,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Create a WindowedDataStream that can be used to apply
-   * transformation like .reduceWindow(...) or aggregations on
-   * preset chunks(windows) of the data stream. To define the windows a
-   * WindowingHelper such as Time, Count and
-   * Delta can be used.</br></br> When applied to a grouped data
-   * stream, the windows (evictions) and slide sizes (triggers) will be
-   * computed on a per group basis. </br></br> For more advanced control over
-   * the trigger and eviction policies please use to
-   * window(List(triggers), List(evicters))
-   */
-  def window(windowingHelper: WindowingHelper[_]): WindowedDataStream[T] =
-    javaStream.window(windowingHelper)
-
-  /**
-   * Create a WindowedDataStream using the given Trigger and Eviction policies.
-   * Windowing can be used to apply transformation like .reduceWindow(...) or 
-   * aggregations on preset chunks(windows) of the data stream.</br></br>For most common
-   * use-cases please refer to window(WindowingHelper[_])
-   *
-   */
-  def window(trigger: TriggerPolicy[T], eviction: EvictionPolicy[T]):
-    WindowedDataStream[T] = javaStream.window(trigger, eviction)
-    
-  /**
-   * Create a WindowedDataStream based on the full stream history to perform periodic
-   * aggregations.
-   */  
-  def every(windowingHelper: WindowingHelper[_]): WindowedDataStream[T] = 
-    javaStream.every(windowingHelper)
-
-  /**
    * Windows this DataStream into tumbling time windows.
    *
    * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
deleted file mode 100644
index 8ef94f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.scala.ClosureCleaner
-
-import scala.Array.canBuildFrom
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream, DiscretizedStream}
-import org.apache.flink.streaming.api.functions.WindowMapFunction
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.windowing.StreamWindow
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
-import org.apache.flink.util.Collector
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
-
-class WindowedDataStream[T](javaStream: JavaWStream[T]) {
-
-  /**
-   * Gets the name of the current data stream. This name is
-   * used by the visualization and logging during runtime.
-   *
-   * @return Name of the stream.
-   */
-  def getName : String = javaStream match {
-    case stream : DiscretizedStream[_] => stream.getName
-    case _ => throw new
-        UnsupportedOperationException("Only supported for windowing operators.")
-  }
-
-  /**
-   * Sets the name of the current data stream. This name is
-   * used by the visualization and logging during runtime.
-   *
-   * @return The named operator
-   */
-  def name(name: String) : WindowedDataStream[T] = javaStream match {
-    case stream : DiscretizedStream[T] => stream.name(name)
-    case _ => throw new
-        UnsupportedOperationException("Only supported for windowing operators.")
-    this
-  }
-
-  /**
-   * Defines the slide size (trigger frequency) for the windowed data stream.
-   * This controls how often the user defined function will be triggered on
-   * the window.
-   */
-  def every(windowingHelper: WindowingHelper[_]): WindowedDataStream[T] =
-    javaStream.every(windowingHelper)
-
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * field positions. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a global fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def keyBy(fields: Int*): WindowedDataStream[T] = javaStream.keyBy(fields: _*)
-
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * field expressions. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a global fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def keyBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
-   javaStream.keyBy(firstField +: otherFields.toArray: _*)
-    
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * KeySelector function. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a global fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def keyBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
-
-    val cleanFun = clean(fun)
-    val keyExtractor = new KeySelector[T, K] {
-      def getKey(in: T) = cleanFun(in)
-    }
-    javaStream.keyBy(keyExtractor)
-  }
-  
-  /**
-   * Sets the window discretisation local, meaning that windows will be
-   * created in parallel at environment parallelism.
-   * 
-   */
-  def local(): WindowedDataStream[T] = javaStream.local
- 
-  /**
-   * Flattens the result of a window transformation returning the stream of window
-   * contents elementwise.
-   */
-  def flatten(): DataStream[T] = javaStream.flatten()
-  
-  /**
-   * Returns the stream of StreamWindows created by the window tranformation
-   */
-  def getDiscretizedStream(): DataStream[StreamWindow[T]] = javaStream.getDiscretizedStream()
-
-  /**
-   * Applies a reduce transformation on the windowed data stream by reducing
-   * the current window at every trigger.
-   *
-   */
-  def reduceWindow(reducer: ReduceFunction[T]): WindowedDataStream[T] = {
-    if (reducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    javaStream.reduceWindow(reducer)
-  }
-
-  /**
-   * Applies a reduce transformation on the windowed data stream by reducing
-   * the current window at every trigger.
-   *
-   */
-  def reduceWindow(fun: (T, T) => T): WindowedDataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    val cleanFun = clean(fun)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
-    reduceWindow(reducer)
-  }
-
-  /**
-   * Applies a fold transformation on the windowed data stream by reducing
-   * the current window at every trigger.
-   *
-   */
-  def foldWindow[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
-  WindowedDataStream[R] = {
-    if (folder == null) {
-      throw new NullPointerException("Fold function must not be null.")
-    }
-    javaStream.foldWindow(initialValue, folder, implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies a fold transformation on the windowed data stream by reducing
-   * the current window at every trigger.
-   *
-   */
-  def foldWindow[R: TypeInformation: ClassTag](initialValue: R, fun: (R, T) => R):
-  WindowedDataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("Fold function must not be null.")
-    }
-    val cleanFun = clean(fun)
-    val folder = new FoldFunction[T,R] {
-      def fold(acc: R, v: T) = { cleanFun(acc, v) }
-    }
-    foldWindow(initialValue, folder)
-  }
-
-  /**
-   * Applies a mapWindow transformation on the windowed data stream by calling the mapWindow
-   * method on current window at every trigger. In contrast with the simple binary reduce 
-   * operator, mapWindow exposes the whole window through the Iterable interface.
-   * </br>
-   * </br>
-   * Whenever possible try to use reduceWindow instead of mapWindow for increased efficiency
-   */
-  def mapWindow[R: ClassTag: TypeInformation](reducer: WindowMapFunction[T, R]):
-  WindowedDataStream[R] = {
-    if (reducer == null) {
-      throw new NullPointerException("GroupReduce function must not be null.")
-    }
-    javaStream.mapWindow(reducer, implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies a mapWindow transformation on the windowed data stream by calling the mapWindow
-   * method on current window at every trigger. In contrast with the simple binary reduce 
-   * operator, mapWindow exposes the whole window through the Iterable interface.
-   * </br>
-   * </br>
-   * Whenever possible try to use reduceWindow instead of mapWindow for increased efficiency
-   */
-  def mapWindow[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
-  WindowedDataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("GroupReduce function must not be null.")
-    }
-    val cleanFun = clean(fun)
-    val reducer = new WindowMapFunction[T, R] {
-      def mapWindow(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in.asScala, out) }
-    }
-    mapWindow(reducer)
-  }
-
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given position.
-   *
-   */
-  def max(position: Int): WindowedDataStream[T] = aggregate(AggregationType.MAX, position)
-  
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given field.
-   *
-   */
-  def max(field: String): WindowedDataStream[T] = aggregate(AggregationType.MAX, field)
-
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given position.
-   *
-   */
-  def min(position: Int): WindowedDataStream[T] = aggregate(AggregationType.MIN, position)
-  
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given field.
-   *
-   */
-  def min(field: String): WindowedDataStream[T] = aggregate(AggregationType.MIN, field)
-
-  /**
-   * Applies an aggregation that sums the elements in the window at the given position.
-   *
-   */
-  def sum(position: Int): WindowedDataStream[T] = aggregate(AggregationType.SUM, position)
-  
-  /**
-   * Applies an aggregation that sums the elements in the window at the given field.
-   *
-   */
-  def sum(field: String): WindowedDataStream[T] = aggregate(AggregationType.SUM, field)
-
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given position. When equality, returns the first.
-   *
-   */
-  def maxBy(position: Int): WindowedDataStream[T] = aggregate(AggregationType.MAXBY,
-    position)
-    
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given field. When equality, returns the first.
-   *
-   */
-  def maxBy(field: String): WindowedDataStream[T] = aggregate(AggregationType.MAXBY,
-    field)
-
-  /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given position. When equality, returns the first.
-   *
-   */
-  def minBy(position: Int): WindowedDataStream[T] = aggregate(AggregationType.MINBY,
-    position)
-    
-   /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given field. When equality, returns the first.
-   *
-   */
-  def minBy(field: String): WindowedDataStream[T] = aggregate(AggregationType.MINBY,
-    field)
-    
-  private def aggregate(aggregationType: AggregationType, field: String): 
-  WindowedDataStream[T] = {
-    val position = fieldNames2Indices(getType(), Array(field))(0)
-    aggregate(aggregationType, position)
-  }  
-
-  def aggregate(aggregationType: AggregationType, position: Int):
-  WindowedDataStream[T] = {
-
-    val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
-
-    val reducer = aggregationType match {
-      case AggregationType.SUM =>
-        new SumAggregator(position, jStream.getType, jStream.getExecutionConfig)
-      case _ =>
-        new ComparableAggregator(position, jStream.getType, aggregationType, true,
-          jStream.getExecutionConfig)
-    }
-
-    new WindowedDataStream[Product](
-            jStream.reduceWindow(reducer)).asInstanceOf[WindowedDataStream[T]]
-  }
-
-  /**
-   * Gets the output type.
-   *
-   * @return The output type.
-   */
-  def getType(): TypeInformation[T] = javaStream.getType
-
-  /**
-   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
-   */
-  private[flink] def clean[F <: AnyRef](f: F): F = {
-    new StreamExecutionEnvironment(
-      javaStream.getDiscretizedStream.getExecutionEnvironment).scalaClean(f)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index a8ddaf8..d4f4618 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -27,9 +27,12 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
+import scala.collection.JavaConverters._
+
 /**
  * A [[WindowedStream]] represents a data stream where elements are grouped by
  * key, and for each key, the stream of elements is split into windows based on a
@@ -180,6 +183,28 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     javaStream.apply(clean(function), implicitly[TypeInformation[R]])
   }
 
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Not that this function requires that all data in the windows is buffered until the window
+   * is evaluated, as the function provides no means of pre-aggregation.
+   *
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    val cleanedFunction = clean(function)
+    val applyFunction = new WindowFunction[T, R, K, W] {
+      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanedFunction(key, window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index d65ea41..e668064 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
 import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
 import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
 import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
 import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
@@ -41,9 +40,6 @@ package object scala {
   implicit def javaToScalaGroupedStream[R, K](javaStream: KeyedJavaStream[R, K]):
   KeyedStream[R, K] = new KeyedStream[R, K](javaStream)
 
-  implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
-    new WindowedDataStream[R](javaWStream)
-
   implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitStream[R] =
     new SplitStream[R](javaStream)
 
@@ -75,7 +71,6 @@ package object scala {
 
   def createTuple2TypeInformation[T1, T2](
       t1: TypeInformation[T1],
-      t2: TypeInformation[T2])
-    : TypeInformation[(T1, T2)] =
+      t2: TypeInformation[T2]) : TypeInformation[(T1, T2)] =
     apiTupleCreator[T1, T2](t1, t2)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
deleted file mode 100644
index 461ad3c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.windowing
-
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
-import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
-
-object Delta {
-
-  /**
-   * Creates a delta helper representing a delta trigger or eviction policy.
-   * </br></br> This policy calculates a delta between the data point which
-   * triggered last and the currently arrived data point. It triggers if the
-   * delta is higher than a specified threshold. </br></br> In case it gets
-   * used for eviction, this policy starts from the first element of the
-   * buffer and removes all elements from the buffer which have a higher delta
-   * then the threshold. As soon as there is an element with a lower delta,
-   * the eviction stops.
-   */
-  def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = {
-    require(deltaFunction != null, "Delta function must not be null")
-    val df = new DeltaFunction[T] {
-      ClosureCleaner.clean(deltaFunction, true)
-      override def getDelta(first: T, second: T) = deltaFunction(first, second)
-    }
-    JavaDelta.of(threshold, df, initVal)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
deleted file mode 100644
index 5cea95b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.windowing
-
-import java.util.concurrent.TimeUnit
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp
-
-object Time {
-
-  /**
-   * Creates a helper representing a time trigger which triggers every given
-   * length (slide size) or a time eviction which evicts all elements older
-   * than length (window size) using System time.
-   *
-   */
-  def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] =
-    JavaTime.of(windowSize, timeUnit)
-
-  /**
-   * Creates a helper representing a time trigger which triggers every given
-   * length (slide size) or a time eviction which evicts all elements older
-   * than length (window size) using a user defined timestamp extractor.
-   *
-   */
-  def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = {
-    require(timestamp != null, "Timestamp must not be null.")
-    val ts = new Timestamp[R] {
-      ClosureCleaner.clean(timestamp, true)
-      override def getTimestamp(in: R) = timestamp(in)
-    }
-    JavaTime.of(windowSize, ts, startTime)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 5a5a8c9..91639ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -19,14 +19,15 @@
 package org.apache.flink.streaming.api.scala
 
 import java.lang
-import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction,
-  Partitioner, FoldFunction, Function}
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, FoldFunction, Function}
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
 import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
-import org.apache.flink.streaming.api.windowing.helper.Count
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
+import org.apache.flink.streaming.api.windowing.triggers.{PurgingTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.runtime.partitioner._
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
@@ -56,21 +57,24 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     assert("testReduce" == dataStream2.getName)
 
     val connected = dataStream1.connect(dataStream2)
-      .flatMap(
-    { (in, out: Collector[Long]) => }, { (in, out: Collector[Long]) => }
-    ).name("testCoFlatMap")
+      .flatMap({ (in, out: Collector[(Long, Long)]) => }, { (in, out: Collector[(Long, Long)]) => })
+      .name("testCoFlatMap")
+
     assert("testCoFlatMap" == connected.getName)
 
-    val func: ((Long, Long) => Long) =
-      (x: Long, y: Long) => 0L
+    val func: (((Long, Long), (Long, Long)) => (Long, Long)) =
+      (x: (Long, Long), y: (Long, Long)) => (0L, 0L)
 
-    val windowed = connected.window(Count.of(10))
-      .foldWindow(0L, func)
+    val windowed = connected
+      .windowAll(GlobalWindows.create())
+      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
+      .fold((0L, 0L), func)
 
     windowed.name("testWindowFold")
+
     assert("testWindowFold" == windowed.getName)
 
-    windowed.flatten().print()
+    windowed.print()
 
     val plan = env.getExecutionPlan
 
@@ -239,11 +243,12 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
 
     val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
-    val map = src.map(x => 0L)
-    val windowed: DataStream[Long] = map
-      .window(Count.of(10))
-      .foldWindow(0L, (x: Long, y: Long) => 0L)
-      .flatten
+    val map = src.map(x => (0L, 0L))
+    val windowed: DataStream[(Long, Long)] = map
+      .windowAll(GlobalWindows.create())
+      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
+      .fold((0L, 0L), (x: (Long, Long), y: (Long, Long)) => (0L, 0L))
+
     windowed.print()
     val sink = map.addSink(x => {})
 
@@ -294,15 +299,17 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val map: DataStream[(Integer, String)] = src1.map(x => null)
     assert(classOf[scala.Tuple2[Integer, String]] == map.getType.getTypeClass)
 
-    val window: WindowedDataStream[String] = map
-      .window(Count.of(5))
-      .mapWindow((x: Iterable[(Integer, String)], y: Collector[String]) => {})
+    val window: DataStream[String] = map
+      .windowAll(GlobalWindows.create())
+      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
+      .apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})
+
     assert(TypeExtractor.getForClass(classOf[String]) == window.getType)
 
     val flatten: DataStream[Int] = window
-      .foldWindow(0,
-        (accumulator: Int, value: String) => 0
-      ).flatten
+      .windowAll(GlobalWindows.create())
+      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
+      .fold(0, (accumulator: Int, value: String) => 0)
     assert(TypeExtractor.getForClass(classOf[Int]) == flatten.getType)
 
     // TODO check for custom case class

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 53aa1e2..101f3b5 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
 import java.lang.reflect.Method
 
 import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, JoinedStreams}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
 
 import scala.language.existentials
 
@@ -149,10 +149,5 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
       classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
       classOf[CoGroupedStreams.WithWindow[_,_,_,_]])
-
-    checkMethods(
-      "WindowedDataStream", "WindowedDataStream",
-      classOf[org.apache.flink.streaming.api.datastream.WindowedDataStream[_]],
-      classOf[WindowedDataStream[_]])
   }
 }


Mime
View raw message