flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [09/20] flink git commit: [streaming] Test cases added for all new windowing invokables
Date Mon, 16 Feb 2015 14:25:35 GMT
[streaming] Test cases added for all new windowing invokables


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d10a235
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d10a235
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d10a235

Branch: refs/heads/master
Commit: 3d10a2352e41deb46b72fab5a07921a49464cf29
Parents: f571ece
Author: Gyula Fora <gyfora@apache.org>
Authored: Sat Feb 14 20:36:45 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 16 13:06:08 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/windowing/StreamWindow.java   |  3 +
 .../operator/windowing/WindowFlattenerTest.java | 53 ++++++++++++++
 .../windowing/WindowIntegrationTest.java        |  2 +-
 .../operator/windowing/WindowMapperTest.java    | 60 ++++++++++++++++
 .../operator/windowing/WindowMergerTest.java    | 75 ++++++++++++++++++++
 .../windowing/WindowPartitionerTest.java        | 75 ++++++++++++++++++++
 .../operator/windowing/WindowReducerTest.java   | 61 ++++++++++++++++
 7 files changed, 328 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d10a235/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
index 74ed62e..49b40f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
@@ -140,6 +140,9 @@ public class StreamWindow<T> extends ArrayList<T> implements
Collector<T> {
 	 */
 	public List<StreamWindow<T>> split(int n) {
 		int numElements = size();
+		if (n == 0) {
+			return new ArrayList<StreamWindow<T>>();
+		}
 		if (n > numElements) {
 			return split(numElements);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/3d10a235/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattenerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattenerTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattenerTest.java
new file mode 100644
index 0000000..bee6733
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattenerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class WindowFlattenerTest {
+
+	@Test
+	public void test() {
+		StreamInvokable<StreamWindow<Integer>, Integer> flattener = new WindowFlattener<Integer>();
+
+		StreamWindow<Integer> w1 = StreamWindow.fromElements(1, 2, 3);
+		StreamWindow<Integer> w2 = new StreamWindow<Integer>();
+
+		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
+		input.add(w1);
+		input.add(w2);
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.addAll(w1);
+		expected.addAll(w2);
+
+		List<Integer> output = MockContext.createAndExecute(flattener, input);
+
+		assertEquals(expected, output);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d10a235/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index a430700..05ac5fd 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -147,7 +147,7 @@ public class WindowIntegrationTest implements Serializable {
 
 	}
 
-	private <R> void validateOutput(List<R> expected, List<R> actual) {
+	public static <R> void validateOutput(List<R> expected, List<R> actual)
{
 		assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3d10a235/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapperTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapperTest.java
new file mode 100644
index 0000000..c4022ea
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapperTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.WindowMapFunction;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.util.MockContext;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class WindowMapperTest {
+
+	@Test
+	public void test() {
+		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> windowMapper
= new WindowMapper<Integer, Integer>(
+				new WindowMapFunction<Integer, Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void mapWindow(Iterable<Integer> values, Collector<Integer> out)
+							throws Exception {
+						for (Integer v : values) {
+							out.collect(v);
+						}
+					}
+				});
+
+		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
+		input.add(StreamWindow.fromElements(1, 2, 3));
+		input.add(new StreamWindow<Integer>());
+
+		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMapper,
input);
+
+		assertEquals(input, output);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d10a235/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
new file mode 100644
index 0000000..ac3f583
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class WindowMergerTest {
+
+	@Test
+	public void test() throws Exception {
+		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> windowMerger
= new WindowMerger<Integer>();
+
+		StreamWindow<Integer> w1 = new StreamWindow<Integer>();
+		StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
+		StreamWindow<Integer> w3 = StreamWindow.fromElements(-1, 2, 3, 4);
+		StreamWindow<Integer> w4_1 = new StreamWindow<Integer>(1, 2);
+		StreamWindow<Integer> w4_2 = new StreamWindow<Integer>(1, 2);
+		w4_1.add(1);
+		w4_2.add(2);
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(w1);
+		expected.add(w2);
+		expected.add(w3);
+		expected.add(StreamWindow.fromElements(1, 2));
+
+		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
+		input.add(w1);
+		input.add(w4_1);
+		input.addAll(w2.split(2));
+		input.addAll(w3.partitionBy(new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value % 2;
+			}
+		}));
+		input.add(w4_2);
+
+		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMerger,
input);
+
+		assertEquals(expected.size(), expected.size());
+		for (int i = 0; i < output.size(); i++) {
+			assertEquals(new HashSet<Integer>(expected.get(i)), new HashSet<Integer>(output.get(i)));
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d10a235/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
new file mode 100644
index 0000000..da68211
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class WindowPartitionerTest {
+
+	@Test
+	public void test() throws Exception {
+		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> splitPartitioner
= new WindowPartitioner<Integer>(
+				2);
+
+		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> gbPartitioner
= new WindowPartitioner<Integer>(
+				new MyKey());
+
+		StreamWindow<Integer> w1 = new StreamWindow<Integer>();
+		StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
+
+		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
+		expected1.addAll(w1.split(2));
+		expected1.addAll(w2.split(2));
+
+		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
+		expected2.addAll(w1.partitionBy(new MyKey()));
+		expected2.addAll(w2.partitionBy(new MyKey()));
+
+		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
+		input.add(w1);
+		input.add(w2);
+
+		List<StreamWindow<Integer>> output1 = MockContext.createAndExecute(splitPartitioner,
input);
+		List<StreamWindow<Integer>> output2 = MockContext.createAndExecute(gbPartitioner,
input);
+
+		assertEquals(expected1, output1);
+		assertEquals(expected2, output2);
+
+	}
+
+	private static class MyKey implements KeySelector<Integer, Object> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object getKey(Integer value) throws Exception {
+			return value / 2;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d10a235/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducerTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducerTest.java
new file mode 100644
index 0000000..e3d742c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class WindowReducerTest {
+
+	@Test
+	public void test() {
+		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> windowReducer
= new WindowReducer<Integer>(
+				new ReduceFunction<Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer reduce(Integer value1, Integer value2) throws Exception {
+						return value1 + value2;
+					}
+				});
+
+		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
+		input.add(StreamWindow.fromElements(1, 2, 3));
+		input.add(new StreamWindow<Integer>());
+		input.add(StreamWindow.fromElements(-1));
+
+		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(6));
+		expected.add(new StreamWindow<Integer>());
+		expected.add(StreamWindow.fromElements(-1));
+
+		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowReducer,
input);
+
+		assertEquals(expected, output);
+
+	}
+}


Mime
View raw message