flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/8] flink git commit: [FLINK-5237] Consolidate and harmonize Window Translation Tests
Date Fri, 17 Feb 2017 16:24:11 GMT
Repository: flink
Updated Branches:
  refs/heads/master 09fe4b0b8 -> 5368a7d32


http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index bd3fe3d..600645b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.state.{AggregatingStateDescriptor, FoldingSta
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.operators.{OneInputStreamOperator, OutputTypeConfigurable}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
@@ -367,6 +367,90 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testReduceWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer,
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer,
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testApplyWithPreReducerEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -408,6 +492,50 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testApplyWithPreReducerAndEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .evictor(CountEvictor.of(100))
+      .apply(
+        new DummyReducer,
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
   def testReduceWithWindowFunctionEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -584,6 +712,74 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testAggregateWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .aggregate(new DummyAggregator(), new TestProcessWindowFunction())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_,
_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testAggregateWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+      .aggregate(new DummyAggregator(), new TestProcessWindowFunction())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_,
_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testAggregateWithWindowFunctionEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -814,7 +1010,7 @@ class WindowTranslationTest {
   }
 
   @Test
-  def testApplyWithPreFolderEventTime() {
+  def testFoldWithProcessWindowFunctionEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
@@ -823,16 +1019,15 @@ class WindowTranslationTest {
     val window1 = source
       .keyBy(_._1)
       .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-      .apply(
+      .fold(
         ("", "", 1),
         new DummyFolder,
-        new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow]
{
-          override def apply(
+        new ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow]
{
+          override def process(
               key: String,
-              window: TimeWindow,
+              window: Context,
               input: Iterable[(String, String, Int)],
-              out: Collector[(String, String, Int)]): Unit =
-            input foreach {x => out.collect((x._1, x._2, x._3))}
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._3))}
         })
 
     val transform = window1
@@ -858,20 +1053,24 @@ class WindowTranslationTest {
   }
 
   @Test
-  def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
+  def testFoldWithProcessWindowFunctionProcessingTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
       .keyBy(_._1)
-      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
       .fold(
         ("", "", 1),
-        { (acc: (String, String, Int), _) => acc },
-        { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
-          in foreach { x => out.collect((x._1, x._3)) }
+        new DummyFolder,
+        new ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow]
{
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._3))}
         })
 
     val transform = window1
@@ -885,8 +1084,8 @@ class WindowTranslationTest {
     val winOperator = operator
       .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
-    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
     assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
 
     processElementAndEnsureOutput[String, (String, Int), (String, Int)](
@@ -896,12 +1095,8 @@ class WindowTranslationTest {
       ("hello", 1))
   }
 
-  // --------------------------------------------------------------------------
-  //  apply() tests
-  // --------------------------------------------------------------------------
-
   @Test
-  def testApplyEventTime() {
+  def testApplyWithPreFolderEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
@@ -911,12 +1106,15 @@ class WindowTranslationTest {
       .keyBy(_._1)
       .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
       .apply(
-        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+        ("", "", 1),
+        new DummyFolder,
+        new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow]
{
           override def apply(
               key: String,
               window: TimeWindow,
-              input: Iterable[(String, Int)],
-              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, String, Int)]): Unit =
+            input foreach {x => out.collect((x._1, x._2, x._3))}
         })
 
     val transform = window1
@@ -932,7 +1130,7 @@ class WindowTranslationTest {
 
     assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
     assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
 
     processElementAndEnsureOutput[String, (String, Int), (String, Int)](
       winOperator,
@@ -942,22 +1140,26 @@ class WindowTranslationTest {
   }
 
   @Test
-  def testApplyProcessingTimeTime() {
+  def testApplyWithPreFolderAndEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
       .keyBy(_._1)
-      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
       .apply(
-        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+        ("", "", 1),
+        new DummyFolder,
+        new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow]
{
           override def apply(
-              key: String,
-              window: TimeWindow,
-              input: Iterable[(String, Int)],
-              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+                              key: String,
+                              window: TimeWindow,
+                              input: Iterable[(String, String, Int)],
+                              out: Collector[(String, String, Int)]): Unit =
+            input foreach {x => out.collect((x._1, x._2, x._3))}
         })
 
     val transform = window1
@@ -971,8 +1173,8 @@ class WindowTranslationTest {
     val winOperator = operator
       .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
-    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
     assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
 
     processElementAndEnsureOutput[String, (String, Int), (String, Int)](
@@ -983,7 +1185,7 @@ class WindowTranslationTest {
   }
 
   @Test
-  def testApplyEventTimeWithScalaFunction() {
+  def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
@@ -992,9 +1194,12 @@ class WindowTranslationTest {
     val window1 = source
       .keyBy(_._1)
       .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-      .apply { (key, window, in, out: Collector[(String, Int)]) =>
-        in foreach { x => out.collect(x)}
-      }
+      .fold(
+        ("", "", 1),
+        { (acc: (String, String, Int), _) => acc },
+        { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+          in foreach { x => out.collect((x._1, x._3)) }
+        })
 
     val transform = window1
       .javaStream
@@ -1009,7 +1214,211 @@ class WindowTranslationTest {
 
     assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
     assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  // --------------------------------------------------------------------------
+  //  apply() tests
+  // --------------------------------------------------------------------------
+
+  @Test
+  def testApplyEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testProcessEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .process(
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testProcessProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .process(
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply { (key, window, in, out: Collector[(String, Int)]) =>
+        in foreach { x => out.collect(x)}
+      }
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
 
     processElementAndEnsureOutput[String, (String, Int), (String, Int)](
       winOperator,
@@ -1132,6 +1541,49 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testProcessWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .process(
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
   def testReduceWithEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1169,6 +1621,113 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testReduceWithEvictorAndProcessFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .reduce(new DummyReducer, new TestProcessWindowFunction)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[
+      EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testAggregateWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .aggregate(new DummyAggregator())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testAggregateWithEvictorAndProcessFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .aggregate(new DummyAggregator(), new TestProcessWindowFunction)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testFoldWithEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1210,6 +1769,48 @@ class WindowTranslationTest {
   }
 
   @Test
+  def testFoldWithEvictorAndProcessFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .fold(("", "", 1), new DummyFolder, new TestFoldProcessWindowFunction)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[
+      EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    winOperator.setOutputType(
+      window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]],
+      new ExecutionConfig)
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
   def testApplyWithEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1252,6 +1853,48 @@ class WindowTranslationTest {
       ("hello", 1))
   }
 
+  @Test
+  def testProcessWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .process(
+        new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def process(
+              key: String,
+              window: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <:
Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
 
   /**
     * Ensure that we get some output from the given operator when pushing in an element and
@@ -1266,6 +1909,12 @@ class WindowTranslationTest {
     val testHarness =
       new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType)
 
+    if (operator.isInstanceOf[OutputTypeConfigurable[String]]) {
+      // use a dummy type since window functions just need the ExecutionConfig
+      // this is also only needed for Fold, which we're getting rid off soon.
+      operator.asInstanceOf[OutputTypeConfigurable[String]]
+        .setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig)
+    }
     testHarness.open()
 
     testHarness.setProcessingTime(0)
@@ -1319,16 +1968,45 @@ class DummyRichAggregator extends RichAggregateFunction[(String, Int),
(String,
   override def add(value: (String, Int), accumulator: (String, Int)): Unit = ()
 }
 
-class TestWindowFunction extends WindowFunction[(String, Int), (String, Int), String, TimeWindow]
{
+class TestWindowFunction
+  extends WindowFunction[(String, Int), (String, String, Int), String, TimeWindow] {
 
   override def apply(
       key: String,
       window: TimeWindow,
       input: Iterable[(String, Int)],
-      out: Collector[(String, Int)]): Unit = {
+      out: Collector[(String, String, Int)]): Unit = {
+
+    input.foreach(e => out.collect((e._1, e._1, e._2)))
+  }
+}
+
+class TestProcessWindowFunction
+  extends ProcessWindowFunction[(String, Int), (String, String, Int), String, TimeWindow]
{
+
+  override def process(
+      key: String,
+      window: Context,
+      input: Iterable[(String, Int)],
+      out: Collector[(String, String, Int)]): Unit = {
 
-    input.foreach(out.collect)
+    input.foreach(e => out.collect((e._1, e._1, e._2)))
   }
 }
 
+class TestFoldProcessWindowFunction
+  extends ProcessWindowFunction[(String, String, Int), (String, Int), String, TimeWindow]
{
+
+  override def process(
+                        key: String,
+                        window: Context,
+                        input: Iterable[(String, String, Int)],
+                        out: Collector[(String, Int)]): Unit = {
+
+    input.foreach(e => out.collect((e._1, e._3)))
+  }
+}
+
+
+
 


Mime
View raw message