flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [5/6] git commit: [streaming] Improved tests for CoReduceInvokables
Date Wed, 01 Oct 2014 15:45:05 GMT
[streaming] Improved tests for CoReduceInvokables


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9e722dfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9e722dfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9e722dfd

Branch: refs/heads/master
Commit: 9e722dfd5d2b9aa8046d4fe33167daf938ee5a3b
Parents: 127470b
Author: szape <nemderogatorius@gmail.com>
Authored: Tue Sep 30 20:20:18 2014 +0200
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Wed Oct 1 17:11:38 2014 +0200

----------------------------------------------------------------------
 .../invokable/operator/CoBatchReduceTest.java   |  83 +++++++++---
 .../operator/CoGroupedBatchReduceTest.java      | 108 +++++++++++-----
 .../operator/CoGroupedWindowReduceTest.java     | 125 +++++++++++++------
 .../invokable/operator/CoWindowReduceTest.java  |  91 ++++++++++----
 4 files changed, 299 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e722dfd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
index 1741bb4..fd7439f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 
 public class CoBatchReduceTest {
 
-	private static class MyCoReduceFunction implements CoReduceFunction<Integer, Integer,
String> {
+	private static class MyCoReduceFunction implements CoReduceFunction<Integer, String,
String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -39,7 +39,7 @@ public class CoBatchReduceTest {
 		}
 
 		@Override
-		public Integer reduce2(Integer value1, Integer value2) {
+		public String reduce2(String value1, String value2) {
 			return value1 + value2;
 		}
 
@@ -49,38 +49,83 @@ public class CoBatchReduceTest {
 		}
 
 		@Override
-		public String map2(Integer value) {
-			return value.toString();
+		public String map2(String value) {
+			return value;
 		}
 	}
 
 	@Test
-	public void coBatchReduceTest() {
+	public void coBatchReduceTest1() {
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		for (Integer i = 1; i <= 10; i++) {
+			inputs.add(i);
+		}
+
+		List<String> inputs2 = new ArrayList<String>();
+		inputs2.add("a");
+		inputs2.add("b");
+		inputs2.add("c");
+		inputs2.add("d");
+		inputs2.add("e");
+		inputs2.add("f");
+		inputs2.add("g");
+		inputs2.add("h");
+		inputs2.add("i");
+
+		CoBatchReduceInvokable<Integer, String, String> invokable = new CoBatchReduceInvokable<Integer,
String, String>(
+				new MyCoReduceFunction(), 4L, 3L, 4L, 3L);
+
+		List<String> expected = new ArrayList<String>();
+		expected.add("10");
+		expected.add("26");
+		expected.add("19");
+		expected.add("abc");
+		expected.add("def");
+		expected.add("ghi");
+
+		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+
+		Collections.sort(result);
+		Collections.sort(expected);
+
+		assertEquals(expected, result);
+
+	}
+
+	@Test
+	public void coBatchReduceTest2() {
 
 		List<Integer> inputs = new ArrayList<Integer>();
 		for (Integer i = 1; i <= 10; i++) {
 			inputs.add(i);
 		}
 
-		List<Integer> inputs2 = new ArrayList<Integer>();
-		inputs2.add(1);
-		inputs2.add(2);
-		inputs2.add(-1);
-		inputs2.add(-3);
-		inputs2.add(-4);
+		List<String> inputs2 = new ArrayList<String>();
+		inputs2.add("a");
+		inputs2.add("b");
+		inputs2.add("c");
+		inputs2.add("d");
+		inputs2.add("e");
+		inputs2.add("f");
+		inputs2.add("g");
+		inputs2.add("h");
+		inputs2.add("i");
 
-		CoBatchReduceInvokable<Integer, Integer, String> invokable = new CoBatchReduceInvokable<Integer,
Integer, String>(
-				new MyCoReduceFunction(), 3L, 3L, 2L, 2L);
+		CoBatchReduceInvokable<Integer, String, String> invokable = new CoBatchReduceInvokable<Integer,
String, String>(
+				new MyCoReduceFunction(), 4L, 3L, 2L, 2L);
 
 		List<String> expected = new ArrayList<String>();
-		expected.add("6");
-		expected.add("12");
+		expected.add("10");
 		expected.add("18");
-		expected.add("24");
+		expected.add("26");
+		expected.add("34");
 		expected.add("19");
-		expected.add("2");
-		expected.add("-8");
-		expected.add("-4");
+		expected.add("abc");
+		expected.add("cde");
+		expected.add("efg");
+		expected.add("ghi");
+		expected.add("i");
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e722dfd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
index 31b8348..0689fca 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
@@ -32,7 +32,7 @@ import org.junit.Test;
 public class CoGroupedBatchReduceTest {
 
 	private static class MyCoReduceFunction implements
-			CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>
{
+			CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, String>, String>
{
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -42,9 +42,9 @@ public class CoGroupedBatchReduceTest {
 		}
 
 		@Override
-		public Tuple2<String, Integer> reduce2(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) {
-			return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
+		public Tuple2<String, String> reduce2(Tuple2<String, String> value1,
+				Tuple2<String, String> value2) {
+			return new Tuple2<String, String>("a", value1.f1 + value2.f1);
 		}
 
 		@Override
@@ -53,48 +53,96 @@ public class CoGroupedBatchReduceTest {
 		}
 
 		@Override
-		public String map2(Tuple2<String, Integer> value) {
-			return value.f1.toString();
+		public String map2(Tuple2<String, String> value) {
+			return value.f1;
 		}
 	}
 
 	@Test
-	public void coGroupedBatchReduceTest() {
+	public void coGroupedBatchReduceTest1() {
 
 		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String,
Integer>>();
 		inputs1.add(new Tuple2<String, Integer>("a", 1));
 		inputs1.add(new Tuple2<String, Integer>("a", 2));
-		inputs1.add(new Tuple2<String, Integer>("b", 2));
-		inputs1.add(new Tuple2<String, Integer>("b", 2));
-		inputs1.add(new Tuple2<String, Integer>("b", 5));
+		inputs1.add(new Tuple2<String, Integer>("a", 3));
+		inputs1.add(new Tuple2<String, Integer>("a", 4));
+		inputs1.add(new Tuple2<String, Integer>("a", 5));
+		inputs1.add(new Tuple2<String, Integer>("b", 6));
 		inputs1.add(new Tuple2<String, Integer>("a", 7));
+		inputs1.add(new Tuple2<String, Integer>("b", 8));
 		inputs1.add(new Tuple2<String, Integer>("b", 9));
 		inputs1.add(new Tuple2<String, Integer>("b", 10));
 
-		List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String,
Integer>>();
-		inputs2.add(new Tuple2<String, Integer>("a", 1));
-		inputs2.add(new Tuple2<String, Integer>("a", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 5));
-		inputs2.add(new Tuple2<String, Integer>("a", 7));
-		inputs2.add(new Tuple2<String, Integer>("b", 9));
-		inputs2.add(new Tuple2<String, Integer>("b", 10));
+		List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
+		inputs2.add(new Tuple2<String, String>("1", "a"));
+		inputs2.add(new Tuple2<String, String>("2", "b"));
+		inputs2.add(new Tuple2<String, String>("1", "c"));
+		inputs2.add(new Tuple2<String, String>("2", "d"));
+		inputs2.add(new Tuple2<String, String>("1", "e"));
+		inputs2.add(new Tuple2<String, String>("2", "f"));
+		inputs2.add(new Tuple2<String, String>("1", "g"));
+		inputs2.add(new Tuple2<String, String>("2", "h"));
+		inputs2.add(new Tuple2<String, String>("1", "i"));
 
 		List<String> expected = new ArrayList<String>();
 		expected.add("10");
-		expected.add("7");
-		expected.add("9");
-		expected.add("24");
-		expected.add("10");
-		expected.add("10");
-		expected.add("7");
-		expected.add("9");
-		expected.add("24");
-		expected.add("10");
+		expected.add("12");
+		expected.add("33");
+		expected.add("ace");
+		expected.add("gi");
+		expected.add("bdf");
+		expected.add("h");
+
+		CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>,
String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, String>, String>(
+				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, 0, 0);
+
+		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+
+		Collections.sort(result);
+		Collections.sort(expected);
+		assertEquals(expected, result);
+	}
+
+	@Test
+	public void coGroupedBatchReduceTest2() {
 
-		CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>,
String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, Integer>, String>(
-				new MyCoReduceFunction(), 3L, 3L, 2L, 2L, 0, 0);
+		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String,
Integer>>();
+		inputs1.add(new Tuple2<String, Integer>("a", 1));
+		inputs1.add(new Tuple2<String, Integer>("a", 2));
+		inputs1.add(new Tuple2<String, Integer>("a", 3));
+		inputs1.add(new Tuple2<String, Integer>("a", 4));
+		inputs1.add(new Tuple2<String, Integer>("a", 5));
+		inputs1.add(new Tuple2<String, Integer>("b", 6));
+		inputs1.add(new Tuple2<String, Integer>("a", 7));
+		inputs1.add(new Tuple2<String, Integer>("b", 8));
+		inputs1.add(new Tuple2<String, Integer>("b", 9));
+		inputs1.add(new Tuple2<String, Integer>("b", 10));
+
+		List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
+		inputs2.add(new Tuple2<String, String>("1", "a"));
+		inputs2.add(new Tuple2<String, String>("2", "b"));
+		inputs2.add(new Tuple2<String, String>("1", "c"));
+		inputs2.add(new Tuple2<String, String>("2", "d"));
+		inputs2.add(new Tuple2<String, String>("1", "e"));
+		inputs2.add(new Tuple2<String, String>("2", "f"));
+		inputs2.add(new Tuple2<String, String>("1", "g"));
+		inputs2.add(new Tuple2<String, String>("2", "h"));
+		inputs2.add(new Tuple2<String, String>("1", "i"));
+
+		List<String> expected = new ArrayList<String>();
+		expected.add("10");
+		expected.add("19");
+		expected.add("12");
+		expected.add("33");
+		expected.add("19");
+		expected.add("ace");
+		expected.add("egi");
+		expected.add("i");
+		expected.add("bdf");
+		expected.add("fh");
+
+		CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>,
String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, String>, String>(
+				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, 0, 0);
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e722dfd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index dd57dfb..b2fc3e5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -35,7 +35,7 @@ import org.junit.Test;
 public class CoGroupedWindowReduceTest {
 
 	private static class MyCoReduceFunction implements
-			CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>
{
+			CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, String>, String>
{
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -45,9 +45,9 @@ public class CoGroupedWindowReduceTest {
 		}
 
 		@Override
-		public Tuple2<String, Integer> reduce2(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) {
-			return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
+		public Tuple2<String, String> reduce2(Tuple2<String, String> value1,
+				Tuple2<String, String> value2) {
+			return new Tuple2<String, String>("a", value1.f1 + value2.f1);
 		}
 
 		@Override
@@ -56,8 +56,8 @@ public class CoGroupedWindowReduceTest {
 		}
 
 		@Override
-		public String map2(Tuple2<String, Integer> value) {
-			return value.f1.toString();
+		public String map2(Tuple2<String, String> value) {
+			return value.f1;
 		}
 	}
 
@@ -84,51 +84,100 @@ public class CoGroupedWindowReduceTest {
 		}
 	}
 
-	List<Long> timestamps = Arrays.asList(0L, 1L, 1L, 2L, 2L, 8L, 8L, 10L);
+	@Test
+	public void coGroupedWindowReduceTest1() {
+
+		List<Long> timestamps1 = Arrays.asList(0L, 0L, 1L, 1L, 1L, 1L, 2L, 4L, 5L, 6L);
+		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String,
Integer>>();
+		inputs1.add(new Tuple2<String, Integer>("a", 1));
+		inputs1.add(new Tuple2<String, Integer>("a", 2));
+		inputs1.add(new Tuple2<String, Integer>("a", 3));
+		inputs1.add(new Tuple2<String, Integer>("a", 4));
+		inputs1.add(new Tuple2<String, Integer>("a", 5));
+		inputs1.add(new Tuple2<String, Integer>("b", 6));
+		inputs1.add(new Tuple2<String, Integer>("a", 7));
+		inputs1.add(new Tuple2<String, Integer>("b", 8));
+		inputs1.add(new Tuple2<String, Integer>("b", 9));
+		inputs1.add(new Tuple2<String, Integer>("b", 10));
+
+		List<Long> timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 5L, 5L, 6L, 7L);
+		List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
+		inputs2.add(new Tuple2<String, String>("1", "a"));
+		inputs2.add(new Tuple2<String, String>("2", "b"));
+		inputs2.add(new Tuple2<String, String>("1", "c"));
+		inputs2.add(new Tuple2<String, String>("2", "d"));
+		inputs2.add(new Tuple2<String, String>("1", "e"));
+		inputs2.add(new Tuple2<String, String>("2", "f"));
+		inputs2.add(new Tuple2<String, String>("1", "g"));
+		inputs2.add(new Tuple2<String, String>("2", "h"));
+		inputs2.add(new Tuple2<String, String>("1", "i"));
+
+		List<String> expected = new ArrayList<String>();
+		expected.add("6");
+		expected.add("22");
+		expected.add("27");
+		expected.add("ace");
+		expected.add("bd");
+		expected.add("g");
+		expected.add("fh");
+		expected.add("i");
+
+		CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>,
String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, String>, String>(
+				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, 0, 0,
+				new MyTimeStamp<Tuple2<String, Integer>>(timestamps1),
+				new MyTimeStamp<Tuple2<String, String>>(timestamps2));
+
+		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+
+		Collections.sort(result);
+		Collections.sort(expected);
+		assertEquals(expected, result);
+	}
 
 	@Test
-	public void coGroupedWindowReduceTest() {
+	public void coGroupedWindowReduceTest2() {
 
+		List<Long> timestamps1 = Arrays.asList(0L, 0L, 1L, 2L, 2L, 3L, 4L, 4L, 5L, 6L);
 		List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String,
Integer>>();
 		inputs1.add(new Tuple2<String, Integer>("a", 1));
 		inputs1.add(new Tuple2<String, Integer>("a", 2));
-		inputs1.add(new Tuple2<String, Integer>("b", 2));
-		inputs1.add(new Tuple2<String, Integer>("b", 2));
-		inputs1.add(new Tuple2<String, Integer>("b", 5));
+		inputs1.add(new Tuple2<String, Integer>("a", 3));
+		inputs1.add(new Tuple2<String, Integer>("a", 4));
+		inputs1.add(new Tuple2<String, Integer>("a", 5));
+		inputs1.add(new Tuple2<String, Integer>("b", 6));
 		inputs1.add(new Tuple2<String, Integer>("a", 7));
+		inputs1.add(new Tuple2<String, Integer>("b", 8));
 		inputs1.add(new Tuple2<String, Integer>("b", 9));
 		inputs1.add(new Tuple2<String, Integer>("b", 10));
 
-		List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String,
Integer>>();
-		inputs2.add(new Tuple2<String, Integer>("a", 1));
-		inputs2.add(new Tuple2<String, Integer>("a", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 5));
-		inputs2.add(new Tuple2<String, Integer>("a", 7));
-		inputs2.add(new Tuple2<String, Integer>("b", 9));
-		inputs2.add(new Tuple2<String, Integer>("b", 10));
+		List<Long> timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L, 5L);
+		List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
+		inputs2.add(new Tuple2<String, String>("1", "a"));
+		inputs2.add(new Tuple2<String, String>("2", "b"));
+		inputs2.add(new Tuple2<String, String>("1", "c"));
+		inputs2.add(new Tuple2<String, String>("2", "d"));
+		inputs2.add(new Tuple2<String, String>("1", "e"));
+		inputs2.add(new Tuple2<String, String>("2", "f"));
+		inputs2.add(new Tuple2<String, String>("1", "g"));
+		inputs2.add(new Tuple2<String, String>("2", "h"));
+		inputs2.add(new Tuple2<String, String>("1", "i"));
 
 		List<String> expected = new ArrayList<String>();
-		expected.add("3");
-		expected.add("9");
+		expected.add("15");
+		expected.add("6");
+		expected.add("16");
+		expected.add("23");
 		expected.add("7");
-		expected.add("7");
-		expected.add("9");
-		expected.add("7");
-		expected.add("19");
-		expected.add("3");
-		expected.add("9");
-		expected.add("7");
-		expected.add("7");
-		expected.add("9");
-		expected.add("7");
-		expected.add("19");
-
-		CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>,
String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, Integer>, String>(
-				new MyCoReduceFunction(), 3L, 3L, 2L, 2L, 0, 0,
-				new MyTimeStamp<Tuple2<String, Integer>>(timestamps),
-				new MyTimeStamp<Tuple2<String, Integer>>(timestamps));
+		expected.add("27");
+		expected.add("ace");
+		expected.add("bdf");
+		expected.add("egi");
+		expected.add("fh");
+
+		CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>,
String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>,
Tuple2<String, String>, String>(
+				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, 0, 0,
+				new MyTimeStamp<Tuple2<String, Integer>>(timestamps1),
+				new MyTimeStamp<Tuple2<String, String>>(timestamps2));
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e722dfd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
index 6e821b4..494182b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
@@ -33,7 +33,7 @@ import org.junit.Test;
 
 public class CoWindowReduceTest {
 
-	private static class MyCoReduceFunction implements CoReduceFunction<Integer, Integer,
String> {
+	private static class MyCoReduceFunction implements CoReduceFunction<Integer, String,
String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -42,7 +42,7 @@ public class CoWindowReduceTest {
 		}
 
 		@Override
-		public Integer reduce2(Integer value1, Integer value2) {
+		public String reduce2(String value1, String value2) {
 			return value1 + value2;
 		}
 
@@ -52,8 +52,8 @@ public class CoWindowReduceTest {
 		}
 
 		@Override
-		public String map2(Integer value) {
-			return value.toString();
+		public String map2(String value) {
+			return value;
 		}
 	}
 
@@ -80,36 +80,85 @@ public class CoWindowReduceTest {
 		}
 	}
 
-	List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 8L, 10L);
+	@Test
+	public void coWindowReduceTest1() {
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		for (Integer i = 1; i <= 10; i++) {
+			inputs.add(i);
+		}
+
+		List<String> inputs2 = new ArrayList<String>();
+		inputs2.add("a");
+		inputs2.add("b");
+		inputs2.add("c");
+		inputs2.add("d");
+		inputs2.add("e");
+		inputs2.add("f");
+		inputs2.add("g");
+		inputs2.add("h");
+		inputs2.add("i");
+
+		List<Long> timestamps1 = Arrays.asList(0L, 2L, 3L, 5L, 7L, 9L, 10L, 11L, 11L, 13L);
+		List<Long> timestamps2 = Arrays.asList(0L, 1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L);
+
+		CoWindowReduceInvokable<Integer, String, String> invokable = new CoWindowReduceInvokable<Integer,
String, String>(
+				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new MyTimeStamp<Integer>(timestamps1),
+				new MyTimeStamp<String>(timestamps2));
+
+		List<String> expected = new ArrayList<String>();
+		expected.add("6");
+		expected.add("9");
+		expected.add("30");
+		expected.add("10");
+		expected.add("abcde");
+		expected.add("fghi");
 
-	List<Long> timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L);
+		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+
+		Collections.sort(result);
+		Collections.sort(expected);
+		assertEquals(expected, result);
+
+	}
 
 	@Test
-	public void coWindowReduceTest() {
+	public void coWindowReduceTest2() {
 
 		List<Integer> inputs = new ArrayList<Integer>();
 		for (Integer i = 1; i <= 10; i++) {
 			inputs.add(i);
 		}
 
-		List<Integer> inputs2 = new ArrayList<Integer>();
-		inputs2.add(1);
-		inputs2.add(2);
-		inputs2.add(-1);
-		inputs2.add(-3);
-		inputs2.add(-4);
+		List<String> inputs2 = new ArrayList<String>();
+		inputs2.add("a");
+		inputs2.add("b");
+		inputs2.add("c");
+		inputs2.add("d");
+		inputs2.add("e");
+		inputs2.add("f");
+		inputs2.add("g");
+		inputs2.add("h");
+		inputs2.add("i");
 
-		CoWindowReduceInvokable<Integer, Integer, String> invokable = new CoWindowReduceInvokable<Integer,
Integer, String>(
-				new MyCoReduceFunction(), 3L, 3L, 2L, 2L, new MyTimeStamp<Integer>(timestamps1),
-				new MyTimeStamp<Integer>(timestamps2));
+		List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 3L, 8L, 10L, 11L);
+		List<Long> timestamps2 = Arrays.asList(1L, 2L, 4L, 5L, 6L, 9L, 10L, 11L, 13L);
+
+		CoWindowReduceInvokable<Integer, String, String> invokable = new CoWindowReduceInvokable<Integer,
String, String>(
+				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new MyTimeStamp<Integer>(timestamps1),
+				new MyTimeStamp<String>(timestamps2));
 
 		List<String> expected = new ArrayList<String>();
 		expected.add("28");
-		expected.add("26");
-		expected.add("9");
-		expected.add("19");
-		expected.add("1");
-		expected.add("-6");
+		expected.add("18");
+		expected.add("8");
+		expected.add("27");
+		expected.add("ab");
+		expected.add("cd");
+		expected.add("de");
+		expected.add("f");
+		expected.add("fgh");
+		expected.add("hi");
 
 		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
 


Mime
View raw message