flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time
Date Mon, 29 Feb 2016 19:52:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0ac2b1a7b -> 9922d10a0


http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index a676757..f4101cb 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
TumblingTimeWindows, SlidingTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
TumblingEventTimeWindows, SlidingEventTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
@@ -59,7 +59,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingTimeWindows.of(
+      .windowAll(SlidingEventTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .reduce(reducer)
@@ -73,7 +73,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
 
     val window2 = source
       .keyBy(0)
-      .windowAll(SlidingTimeWindows.of(
+      .windowAll(SlidingEventTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
@@ -100,7 +100,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingTimeWindows.of(
+      .windowAll(SlidingEventTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
@@ -114,13 +114,13 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
     assertTrue(
       winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]])
 
 
     val window2 = source
-      .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
       def apply(
@@ -137,7 +137,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
   }
 
@@ -170,7 +170,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
 
 
     val window2 = source
-      .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
@@ -189,7 +189,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
   }
 
@@ -203,7 +203,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingTimeWindows.of(
+      .window(SlidingEventTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
@@ -223,14 +223,14 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
     assertTrue(
       winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
 
     val window2 = source
       .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]()
{
         def apply(
@@ -248,7 +248,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
     assertTrue(
       winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 054f116..f700fa3 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 
@@ -84,7 +84,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
     source1.coGroup(source2)
       .where(_._1)
       .equalTo(_._1)
-      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .apply { (first: Iterator[(String, Int)], second: Iterator[(String, Int)]) =>
           "F:" + first.mkString("") + " S:" + second.mkString("")
       }
@@ -156,7 +156,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
     source1.join(source2)
       .where(_._1)
       .equalTo(_._1)
-      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .apply( (l, r) => l.toString + ":" + r.toString)
       .addSink(new SinkFunction[String]() {
         def invoke(value: String) {
@@ -219,7 +219,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
     source1.join(source1)
       .where(_._1)
       .equalTo(_._1)
-      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .apply( (l, r) => l.toString + ":" + r.toString)
       .addSink(new SinkFunction[String]() {
       def invoke(value: String) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index ea62925..c709c48 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermark
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Test
@@ -69,7 +69,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
     source1
       .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2
+ v._2) })
       .addSink(new SinkFunction[(String, Int)]() {
         def invoke(value: (String, Int)) {
@@ -116,7 +116,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
     }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
 
     source1
-      .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2
+ v._2) })
       .addSink(new SinkFunction[(String, Int)]() {
       def invoke(value: (String, Int)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 48ff640..60e61f0 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDesc
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
TumblingTimeWindows, SlidingTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
TumblingEventTimeWindows, SlidingEventTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
@@ -91,7 +91,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingTimeWindows.of(
+      .window(SlidingEventTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
@@ -105,14 +105,14 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
     assertTrue(
       winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
 
     val window2 = source
       .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
       def apply(
@@ -130,7 +130,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
     assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
   }
 
@@ -165,7 +165,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
 
     val window2 = source
       .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
@@ -185,7 +185,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
     assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
   }
 
@@ -199,7 +199,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingTimeWindows.of(
+      .window(SlidingEventTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
@@ -219,14 +219,14 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
     assertTrue(
       winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
 
 
     val window2 = source
       .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]()
{
         def apply(
@@ -244,7 +244,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
     assertTrue(
       winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
   }


Mime
View raw message