flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-5933] Allow Evictor for merging windows
Date Thu, 04 May 2017 10:45:01 GMT
Repository: flink
Updated Branches:
  refs/heads/master c969237fc -> 99fb1f881


[FLINK-5933] Allow Evictor for merging windows


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99fb1f88
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99fb1f88
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99fb1f88

Branch: refs/heads/master
Commit: 99fb1f88121f7d3987323d3a0a4880a6684c8811
Parents: 8a47004
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed May 3 15:18:05 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu May 4 12:43:56 2017 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  4 --
 .../api/datastream/WindowedStream.java          |  4 --
 .../windowing/AllWindowTranslationTest.java     | 39 ++++++++++++++++++++
 .../operators/windowing/WindowOperatorTest.java | 32 ----------------
 .../windowing/WindowTranslationTest.java        | 24 ++++++++++++
 .../api/scala/AllWindowTranslationTest.scala    | 35 ++++++++++++++++++
 .../api/scala/WindowTranslationTest.scala       | 35 ++++++++++++++++++
 7 files changed, 133 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99fb1f88/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 772e635..0d953a9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -175,10 +175,6 @@ public class AllWindowedStream<T, W extends Window> {
 	 */
 	@PublicEvolving
 	public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor)
{
-		if (windowAssigner instanceof MergingWindowAssigner) {
-			throw new UnsupportedOperationException("Cannot use a merging WindowAssigner with an Evictor.");
-		}
-
 		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
 			throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName()
+ " with an Evictor.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb1f88/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 688e9b6..2d7dafe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -191,10 +191,6 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	@PublicEvolving
 	public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor)
{
-		if (windowAssigner instanceof MergingWindowAssigner) {
-			throw new UnsupportedOperationException("Cannot use a merging WindowAssigner with an Evictor.");
-		}
-
 		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
 			throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName()
+ " with an Evictor.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb1f88/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 468f14c..81a9275 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -235,6 +235,30 @@ public class AllWindowTranslationTest {
 		fail("The trigger call should fail.");
 	}
 
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testMergingWindowsWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
+				.evictor(CountEvictor.of(5))
+				.process(new TestProcessAllWindowFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String,
Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String,
Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof EventTimeSessionWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO,
new Tuple2<>("hello", 1));
+	}
+
 	// ------------------------------------------------------------------------
 	//  reduce() translation tests
 	// ------------------------------------------------------------------------
@@ -1515,4 +1539,19 @@ public class AllWindowTranslationTest {
 			}
 		}
 	}
+
+	private static class TestProcessAllWindowFunction
+			extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String,
Integer>, TimeWindow> {
+
+		@Override
+		public void process(Context ctx,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, String, Integer>> out) throws Exception {
+
+			for (Tuple2<String, Integer> in : values) {
+				out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb1f88/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 474548c..ce84501 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -801,38 +801,6 @@ public class WindowOperatorTest extends TestLogger {
 	}
 
 	@Test
-	public void testMergeAndEvictor() throws Exception {
-		// verify that merging WindowAssigner and Evictor cannot be used together
-
-		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
-
-		WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello",
"Ciao")
-				.keyBy(new KeySelector<String, String>() {
-					private static final long serialVersionUID = -887743259776124087L;
-
-					@Override
-					public String getKey(String value) throws Exception {
-						return value;
-					}
-				})
-				.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
-
-		try {
-			windowedStream.evictor(CountEvictor.of(13));
-
-		} catch (UnsupportedOperationException e) {
-			// expected
-			// use a catch to ensure that the exception is thrown by the fold
-			return;
-		}
-
-		fail("The evictor call should fail.");
-
-		env.execute();
-
-	}
-
-	@Test
 	@SuppressWarnings("unchecked")
 	/**
 	 * This tests a custom Session window assigner that assigns some elements to "point windows",

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb1f88/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index b899948..5071c37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -253,6 +253,30 @@ public class WindowTranslationTest {
 		fail("The trigger call should fail.");
 	}
 
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testMergingWindowsWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
+				.evictor(CountEvictor.of(5))
+				.process(new TestProcessWindowFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String,
Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String,
Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof EventTimeSessionWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO,
new Tuple2<>("hello", 1));
+	}
 
 	// ------------------------------------------------------------------------
 	//  Reduce Translation Tests

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb1f88/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index ee9f50c..e8d5a12 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -174,6 +174,41 @@ class AllWindowTranslationTest {
     fail("The trigger call should fail.")
   }
 
+  @Test
+  def testMergingWindowsWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(EventTimeSessionWindows.withGap(Time.seconds(1)))
+      .evictor(CountEvictor.of(2))
+      .process(new TestProcessAllWindowFunction)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[EventTimeSessionWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+
   // ------------------------------------------------------------------------
   //  reduce() translation tests
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb1f88/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 600645b..cc55f0d 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -178,6 +178,41 @@ class WindowTranslationTest {
     fail("The trigger call should fail.")
   }
 
+  @Test
+  def testMergingWindowsWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .keyBy(_._1)
+      .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
+      .evictor(CountEvictor.of(2))
+      .process(new TestProcessWindowFunction)
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[EventTimeSessionWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
   // --------------------------------------------------------------------------
   //  reduce() tests
   // --------------------------------------------------------------------------


Mime
View raw message