flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [21/28] git commit: [streaming] Tests added for Reduce, GroupReduce and BatchGroupReduce invokables
Date Fri, 29 Aug 2014 19:03:54 GMT
[streaming] Tests added for Reduce, GroupReduce and BatchGroupReduce invokables


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d5d97068
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d5d97068
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d5d97068

Branch: refs/heads/master
Commit: d5d970683e4f40016efba1b44f705b1f8e86c1db
Parents: 4207c5f
Author: gyfora <gyula.fora@gmail.com>
Authored: Wed Aug 27 16:32:39 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../operator/BatchGroupReduceTest.java          | 96 ++++++++++++++++++++
 .../operator/GroupReduceInvokableTest.java      | 54 +++++++++++
 .../invokable/operator/StreamReduceTest.java    | 54 +++++++++++
 .../flink/streaming/util/MockInvokable.java     |  1 +
 4 files changed, 205 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d97068/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
new file mode 100755
index 0000000..bd6bfba
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class BatchGroupReduceTest {
+
+	public static final class MySlidingBatchReduce1 implements GroupReduceFunction<Integer,
String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce(Iterable<Integer> values, Collector<String> out) throws
Exception {
+			for (Integer value : values) {
+				out.collect(value.toString());
+			}
+			out.collect(END_OF_GROUP);
+		}
+	}
+
+	public static final class MySlidingBatchReduce2 extends
+			RichGroupReduceFunction<Tuple2<Integer, String>, String> {
+		private static final long serialVersionUID = 1L;
+
+		String openString;
+		
+		@Override
+		public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<String>
out)
+				throws Exception {
+			out.collect(openString);
+			for (Tuple2<Integer, String> value : values) {
+				out.collect(value.f0.toString());
+			}
+			out.collect(END_OF_GROUP);
+		}
+		
+		@Override
+		public void open(Configuration c){
+			openString = "open";
+		}
+	}
+
+	private final static String END_OF_GROUP = "end of group";
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void slidingBatchGroupReduceTest() {
+		BatchGroupReduceInvokable<Integer, String> invokable1 = new BatchGroupReduceInvokable<Integer,
String>(
+				new MySlidingBatchReduce1(), 3, 2, 0);
+
+		List<String> expected = Arrays.asList("1", "1", END_OF_GROUP, "2", END_OF_GROUP,
"2",
+				END_OF_GROUP, "3", "3", END_OF_GROUP);
+		List<String> actual = MockInvokable.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 3, 3));
+
+		assertEquals(expected, actual);
+
+		BatchGroupReduceInvokable<Tuple2<Integer, String>, String> invokable2 = new
BatchGroupReduceInvokable<Tuple2<Integer, String>, String>(
+				new MySlidingBatchReduce2(), 2, 2, 1);
+
+		expected = Arrays.asList("open","1", "2", END_OF_GROUP,"open", "3", END_OF_GROUP,"open",
"4", END_OF_GROUP);
+		actual = MockInvokable.createAndExecute(invokable2, Arrays.asList(
+				new Tuple2<Integer, String>(1, "a"), new Tuple2<Integer, String>(2, "a"),
+				new Tuple2<Integer, String>(3, "b"), new Tuple2<Integer, String>(4, "a")));
+
+		assertEquals(expected, actual);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d97068/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokableTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokableTest.java
new file mode 100755
index 0000000..ae28034
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokableTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.junit.Test;
+
+public class GroupReduceInvokableTest {
+
+	private static class MyReducer implements ReduceFunction<Integer>{
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1+value2;
+		}
+		
+	}
+	
+	@Test
+	public void test() {
+		GroupReduceInvokable<Integer> invokable1 = new GroupReduceInvokable<Integer>(
+				new MyReducer(),0);
+
+		List<Integer> expected = Arrays.asList(1,2,2,4,3);
+		List<Integer> actual = MockInvokable.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 2, 3));
+
+		assertEquals(expected, actual);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d97068/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
new file mode 100755
index 0000000..994cbb0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.junit.Test;
+
+public class StreamReduceTest {
+
+	private static class MyReducer implements ReduceFunction<Integer>{
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1+value2;
+		}
+		
+	}
+	
+	@Test
+	public void test() {
+		StreamReduceInvokable<Integer> invokable1 = new StreamReduceInvokable<Integer>(
+				new MyReducer());
+
+		List<Integer> expected = Arrays.asList(1,2,4,7,10);
+		List<Integer> actual = MockInvokable.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 3, 3));
+
+		assertEquals(expected, actual);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d97068/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
index 1ea78e1..dd8b029 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -94,6 +94,7 @@ public class MockInvokable<IN, OUT> {
 		try {
 			invokable.open(null);
 			invokable.invoke();
+			invokable.close();
 		} catch (Exception e) {
 			throw new RuntimeException("Cannot invoke invokable.", e);
 		}


Mime
View raw message