flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/6] flink git commit: [FLINK-5237] Consolidate and harmonize Window Translation Tests
Date Wed, 11 Jan 2017 11:15:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 2af939a10 -> aa220e487


http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 299932f..7235b22 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -18,234 +18,1067 @@
 
 package org.apache.flink.streaming.api.scala
 
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor}
-import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction, RichFoldFunction,
RichReduceFunction}
+import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, SlidingProcessingTimeWindows,
TumblingEventTimeWindows}
-import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
 import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, ProcessingTimeTrigger}
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator,
AggregatingProcessingTimeWindowOperator, EvictingWindowOperator, WindowOperator}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, EventTimeTrigger,
ProcessingTimeTrigger, Trigger}
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
+import org.apache.flink.streaming.runtime.operators.windowing._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
 import org.apache.flink.util.Collector
 import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
-class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
+/**
+  * These tests verify that the api calls on [[WindowedStream]] instantiate the correct
+  * window operator.
+  *
+  * We also create a test harness and push one element into the operator to verify
+  * that we get some output.
+  */
+class WindowTranslationTest {
 
   /**
-   * These tests ensure that the fast aligned time windows operator is used if the
-   * conditions are right.
-   */
-  @Test
-  @Ignore
-  def testFastTimeWindows(): Unit = {
+    * .reduce() does not support [[RichReduceFunction]], since the reduce function is used
+    * internally in a [[org.apache.flink.api.common.state.ReducingState]].
+    */
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testReduceWithRichReducerFails() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val reducer = new DummyReducer
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
-    val window1 = source
+    source
       .keyBy(0)
-      .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
-      .reduce(reducer)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .reduce(new RichReduceFunction[(String, Int)] {
+        override def reduce(value1: (String, Int), value2: (String, Int)) = null
+      })
 
-    val transform1 = window1.javaStream.getTransformation
-        .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-    
-    val operator1 = transform1.getOperator
+    fail("exception was not thrown")
+  }
 
-    assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
+  /**
+    * .fold() does not support [[RichFoldFunction]], since the reduce function is used internally
+    * in a [[org.apache.flink.api.common.state.FoldingState]].
+    */
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testFoldWithRichFolderFails() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val window2 = source
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    source
       .keyBy(0)
-      .timeWindow(Time.minutes(1))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-            key: Tuple,
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(("", 0), new RichFoldFunction[(String, Int), (String, Int)] {
+        override def fold(accumulator: (String, Int), value: (String, Int)) = null
+      })
+
+    fail("exception was not thrown")
+  }
+
+  @Test
+  def testSessionWithFoldFails() {
+    // verify that fold does not work with merging windows
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val windowedStream = env.fromElements("Hello", "Ciao")
+      .keyBy(x => x)
+      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
+
+    try
+      windowedStream.fold("", new FoldFunction[String, String]() {
+        @throws[Exception]
+        def fold(accumulator: String, value: String): String = accumulator
+      })
+
+    catch {
+      case _: UnsupportedOperationException =>
+        // expected
+        // use a catch to ensure that the exception is thrown by the fold
+        return
+    }
+
+    fail("The fold call should fail.")
+  }
+
+  @Test
+  def testMergingAssignerWithNonMergingTriggerFails() {
+    // verify that we check for trigger compatibility
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val windowedStream = env.fromElements("Hello", "Ciao")
+      .keyBy(x => x)
+      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
+
+    try
+      windowedStream.trigger(new Trigger[String, TimeWindow]() {
+        def onElement(
+            element: String,
+            timestamp: Long,
             window: TimeWindow,
-            values: Iterable[(String, Int)],
-            out: Collector[(String, Int)]) { }
+            ctx: Trigger.TriggerContext) = null
+
+        def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext)
= null
+
+        def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = null
+
+        override def canMerge = false
+
+        def clear(window: TimeWindow, ctx: Trigger.TriggerContext) {}
       })
 
-    val transform2 = window2.javaStream.getTransformation
+    catch {
+      case _: UnsupportedOperationException =>
+        // expected
+        // use a catch to ensure that the exception is thrown by the fold
+        return
+    }
+
+    fail("The trigger call should fail.")
+  }
+
+  @Test
+  def testReduceEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .reduce(new DummyReducer)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator2 = transform2.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
 
-    assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
   }
 
   @Test
-  def testNonEvicting(): Unit = {
+  def testReduceProcessingTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val reducer = new DummyReducer
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .reduce(new DummyReducer)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
-      .keyBy(0)
-      .window(SlidingEventTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .trigger(CountTrigger.of(100))
-      .reduce(reducer)
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .reduce( (x, _) => x )
 
-    val transform1 = window1.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator1 = transform1.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
 
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
-    assertTrue(
-      winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
-    val window2 = source
-      .keyBy(0)
-      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-      def apply(
-                    tuple: Tuple,
-                    window: TimeWindow,
-                    values: Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
-    val transform2 = window2.javaStream.getTransformation
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer, new WindowFunction[(String, Int), (String, Int), String, TimeWindow]
{
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+      })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator2 = transform2.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
 
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
   }
 
   @Test
-  def testEvicting(): Unit = {
+  def testReduceWithWindowFunctionProcessingTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val reducer = new DummyReducer
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer, new WindowFunction[(String, Int), (String, Int), String, TimeWindow]
{
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyWithPreReducerEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
-      .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
-      .reduce(reducer)
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .apply(
+        new DummyReducer, new WindowFunction[(String, Int), (String, Int), String, TimeWindow]
{
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
 
-    val transform1 = window1.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator1 = transform1.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
 
-    assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
-    assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
-    assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
-    val window2 = source
-      .keyBy(0)
-      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .evictor(CountEvictor.of(1000))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-      def apply(
-                    tuple: Tuple,
-                    window: TimeWindow,
-                    values: Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithWindowFunctionEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
-    val transform2 = window2.javaStream.getTransformation
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        { (x, _) => x },
+        { (_, _, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)}
})
+
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator2 = transform2.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
   }
 
+
   @Test
-  def testPreReduce(): Unit = {
+  def testFoldEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
-    val reducer = new DummyReducer
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(("", "", 1), new DummyFolder)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val window1 = source
-      .keyBy(0)
-      .window(SlidingEventTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .trigger(CountTrigger.of(100))
-      .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]()
{
-        def apply(
-                   tuple: Tuple,
-                   window: TimeWindow,
-                   values: Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
+      .keyBy(_._1)
+      .window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(("", "", 1), new DummyFolder)
 
-    val transform1 = window1.javaStream.getTransformation
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator1 = transform1.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
 
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
-    assertTrue(
-      winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
 
-    val window2 = source
-      .keyBy(0)
-      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]()
{
-        def apply(
-                   tuple: Tuple,
-                   window: TimeWindow,
-                   values: Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
 
-    val transform2 = window2.javaStream.getTransformation
+  @Test
+  def testFoldEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(("", "", 1)) { (acc, _) => acc }
+
+    val transform = window1
+      .javaStream
+      .getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
-    val operator2 = transform2.getOperator
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
 
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
-    assertTrue(
-      winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
+  def testFoldWithWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        new DummyFolder,
+        new WindowFunction[(String, String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._3))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        new DummyFolder,
+        new WindowFunction[(String, String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._3))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyWithPreFolderEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        ("", "", 1),
+        new DummyFolder,
+        new WindowFunction[(String, String, Int), (String, String, Int), String, TimeWindow]
{
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, String, Int)]): Unit =
+            input foreach {x => out.collect((x._1, x._2, x._3))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithWindowFunctionEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        { (acc: (String, String, Int), _) => acc },
+        { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+          in foreach { x => out.collect((x._1, x._3)) }
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
+  def testApplyEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyProcessingTimeTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply(
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyEventTimeWithScalaFunction() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .apply { (key, window, in, out: Collector[(String, Int)]) =>
+        in foreach { x => out.collect(x)}
+      }
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  @Test
+  def testReduceWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .reduce(new DummyReducer)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .fold(("", "", 1), new DummyFolder)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .apply(
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .reduce(new DummyReducer)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[
+      EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(SlidingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .fold(("", "", 1), new DummyFolder)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[
+      EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    winOperator.setOutputType(
+      window1.javaStream.getType.asInstanceOf[TypeInformation[(String, Int)]],
+      new ExecutionConfig)
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testApplyWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .apply(
+        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
+          override def apply(
+              key: String,
+              window: TimeWindow,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1,
x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <:
Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
+  /**
+    * Ensure that we get some output from the given operator when pushing in an element and
+    * setting watermark and processing time to `Long.MaxValue`.
+    */
+  @throws[Exception]
+  private def processElementAndEnsureOutput[K, IN, OUT](
+      operator: OneInputStreamOperator[IN, OUT],
+      keySelector: KeySelector[IN, K],
+      keyType: TypeInformation[K],
+      element: IN) {
+    val testHarness =
+      new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType)
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(0)
+    testHarness.processWatermark(Long.MinValue)
+
+    testHarness.processElement(new StreamRecord[IN](element, 0))
+
+    // provoke any processing-time/event-time triggers
+    testHarness.setProcessingTime(Long.MaxValue)
+    testHarness.processWatermark(Long.MaxValue)
+
+    // we at least get the two watermarks and should also see an output element
+    assertTrue(testHarness.getOutput.size >= 3)
+
+    testHarness.close()
+  }
+}
+
+class DummyReducer extends ReduceFunction[(String, Int)] {
+  override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
+    value1
   }
 }
+
+class DummyFolder extends FoldFunction[(String, Int), (String, String, Int)] {
+  override def fold(acc: (String, String, Int), in: (String, Int)): (String, String, Int)
= {
+    acc
+  }
+}
+


Mime
View raw message