Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61D7617EA9 for ; Wed, 1 Oct 2014 15:45:33 +0000 (UTC) Received: (qmail 2956 invoked by uid 500); 1 Oct 2014 15:45:33 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 2933 invoked by uid 500); 1 Oct 2014 15:45:33 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 2924 invoked by uid 99); 1 Oct 2014 15:45:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Oct 2014 15:45:33 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 01 Oct 2014 15:45:04 +0000 Received: (qmail 2656 invoked by uid 99); 1 Oct 2014 15:45:01 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Oct 2014 15:45:01 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 846DF882F9B; Wed, 1 Oct 2014 15:45:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Wed, 01 Oct 2014 15:45:05 -0000 Message-Id: <528cf1a95b99440bb076d4b50de360bc@git.apache.org> In-Reply-To: <1d743cdf5f7942c1a6132cd341006d7b@git.apache.org> References: <1d743cdf5f7942c1a6132cd341006d7b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] git commit: [streaming] Improved tests for CoReduceInvokables X-Virus-Checked: Checked by ClamAV on apache.org [streaming] Improved tests for CoReduceInvokables Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9e722dfd Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9e722dfd Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9e722dfd Branch: refs/heads/master Commit: 9e722dfd5d2b9aa8046d4fe33167daf938ee5a3b Parents: 127470b Author: szape Authored: Tue Sep 30 20:20:18 2014 +0200 Committer: mbalassi Committed: Wed Oct 1 17:11:38 2014 +0200 ---------------------------------------------------------------------- .../invokable/operator/CoBatchReduceTest.java | 83 +++++++++--- .../operator/CoGroupedBatchReduceTest.java | 108 +++++++++++----- .../operator/CoGroupedWindowReduceTest.java | 125 +++++++++++++------ .../invokable/operator/CoWindowReduceTest.java | 91 ++++++++++---- 4 files changed, 299 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e722dfd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java index 1741bb4..fd7439f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java @@ -30,7 +30,7 @@ import org.junit.Test; public class CoBatchReduceTest { - private static class MyCoReduceFunction implements CoReduceFunction { + private static class MyCoReduceFunction implements CoReduceFunction { private static final long serialVersionUID = 1L; @Override @@ -39,7 +39,7 @@ public class CoBatchReduceTest { } @Override - public Integer reduce2(Integer value1, Integer value2) { + public String reduce2(String value1, String value2) { return value1 + value2; } @@ -49,38 +49,83 @@ public class CoBatchReduceTest { } @Override - public String map2(Integer value) { - return value.toString(); + public String map2(String value) { + return value; } } @Test - public void coBatchReduceTest() { + public void coBatchReduceTest1() { + + List inputs = new ArrayList(); + for (Integer i = 1; i <= 10; i++) { + inputs.add(i); + } + + List inputs2 = new ArrayList(); + inputs2.add("a"); + inputs2.add("b"); + inputs2.add("c"); + inputs2.add("d"); + inputs2.add("e"); + inputs2.add("f"); + inputs2.add("g"); + inputs2.add("h"); + inputs2.add("i"); + + CoBatchReduceInvokable invokable = new CoBatchReduceInvokable( + new MyCoReduceFunction(), 4L, 3L, 4L, 3L); + + List expected = new ArrayList(); + expected.add("10"); + expected.add("26"); + expected.add("19"); + expected.add("abc"); + expected.add("def"); + expected.add("ghi"); + + List result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2); + + Collections.sort(result); + Collections.sort(expected); + + assertEquals(expected, result); + + } + + @Test + public void coBatchReduceTest2() { List inputs = new ArrayList(); for (Integer i = 1; i <= 10; i++) { inputs.add(i); } - List inputs2 = new ArrayList(); - inputs2.add(1); - inputs2.add(2); - inputs2.add(-1); - inputs2.add(-3); - inputs2.add(-4); + List inputs2 = new ArrayList(); + inputs2.add("a"); + inputs2.add("b"); + inputs2.add("c"); + inputs2.add("d"); + inputs2.add("e"); + inputs2.add("f"); + inputs2.add("g"); + inputs2.add("h"); + inputs2.add("i"); - CoBatchReduceInvokable invokable = new CoBatchReduceInvokable( - new MyCoReduceFunction(), 3L, 3L, 2L, 2L); + CoBatchReduceInvokable invokable = new CoBatchReduceInvokable( + new MyCoReduceFunction(), 4L, 3L, 2L, 2L); List expected = new ArrayList(); - expected.add("6"); - expected.add("12"); + expected.add("10"); expected.add("18"); - expected.add("24"); + expected.add("26"); + expected.add("34"); expected.add("19"); - expected.add("2"); - expected.add("-8"); - expected.add("-4"); + expected.add("abc"); + expected.add("cde"); + expected.add("efg"); + expected.add("ghi"); + expected.add("i"); List result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e722dfd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java index 31b8348..0689fca 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java @@ -32,7 +32,7 @@ import org.junit.Test; public class CoGroupedBatchReduceTest { private static class MyCoReduceFunction implements - CoReduceFunction, Tuple2, String> { + CoReduceFunction, Tuple2, String> { private static final long serialVersionUID = 1L; @Override @@ -42,9 +42,9 @@ public class CoGroupedBatchReduceTest { } @Override - public Tuple2 reduce2(Tuple2 value1, - Tuple2 value2) { - return new Tuple2("a", value1.f1 + value2.f1); + public Tuple2 reduce2(Tuple2 value1, + Tuple2 value2) { + return new Tuple2("a", value1.f1 + value2.f1); } @Override @@ -53,48 +53,96 @@ public class CoGroupedBatchReduceTest { } @Override - public String map2(Tuple2 value) { - return value.f1.toString(); + public String map2(Tuple2 value) { + return value.f1; } } @Test - public void coGroupedBatchReduceTest() { + public void coGroupedBatchReduceTest1() { List> inputs1 = new ArrayList>(); inputs1.add(new Tuple2("a", 1)); inputs1.add(new Tuple2("a", 2)); - inputs1.add(new Tuple2("b", 2)); - inputs1.add(new Tuple2("b", 2)); - inputs1.add(new Tuple2("b", 5)); + inputs1.add(new Tuple2("a", 3)); + inputs1.add(new Tuple2("a", 4)); + inputs1.add(new Tuple2("a", 5)); + inputs1.add(new Tuple2("b", 6)); inputs1.add(new Tuple2("a", 7)); + inputs1.add(new Tuple2("b", 8)); inputs1.add(new Tuple2("b", 9)); inputs1.add(new Tuple2("b", 10)); - List> inputs2 = new ArrayList>(); - inputs2.add(new Tuple2("a", 1)); - inputs2.add(new Tuple2("a", 2)); - inputs2.add(new Tuple2("b", 2)); - inputs2.add(new Tuple2("b", 2)); - inputs2.add(new Tuple2("b", 5)); - inputs2.add(new Tuple2("a", 7)); - inputs2.add(new Tuple2("b", 9)); - inputs2.add(new Tuple2("b", 10)); + List> inputs2 = new ArrayList>(); + inputs2.add(new Tuple2("1", "a")); + inputs2.add(new Tuple2("2", "b")); + inputs2.add(new Tuple2("1", "c")); + inputs2.add(new Tuple2("2", "d")); + inputs2.add(new Tuple2("1", "e")); + inputs2.add(new Tuple2("2", "f")); + inputs2.add(new Tuple2("1", "g")); + inputs2.add(new Tuple2("2", "h")); + inputs2.add(new Tuple2("1", "i")); List expected = new ArrayList(); expected.add("10"); - expected.add("7"); - expected.add("9"); - expected.add("24"); - expected.add("10"); - expected.add("10"); - expected.add("7"); - expected.add("9"); - expected.add("24"); - expected.add("10"); + expected.add("12"); + expected.add("33"); + expected.add("ace"); + expected.add("gi"); + expected.add("bdf"); + expected.add("h"); + + CoGroupedBatchReduceInvokable, Tuple2, String> invokable = new CoGroupedBatchReduceInvokable, Tuple2, String>( + new MyCoReduceFunction(), 4L, 3L, 4L, 3L, 0, 0); + + List result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2); + + Collections.sort(result); + Collections.sort(expected); + assertEquals(expected, result); + } + + @Test + public void coGroupedBatchReduceTest2() { - CoGroupedBatchReduceInvokable, Tuple2, String> invokable = new CoGroupedBatchReduceInvokable, Tuple2, String>( - new MyCoReduceFunction(), 3L, 3L, 2L, 2L, 0, 0); + List> inputs1 = new ArrayList>(); + inputs1.add(new Tuple2("a", 1)); + inputs1.add(new Tuple2("a", 2)); + inputs1.add(new Tuple2("a", 3)); + inputs1.add(new Tuple2("a", 4)); + inputs1.add(new Tuple2("a", 5)); + inputs1.add(new Tuple2("b", 6)); + inputs1.add(new Tuple2("a", 7)); + inputs1.add(new Tuple2("b", 8)); + inputs1.add(new Tuple2("b", 9)); + inputs1.add(new Tuple2("b", 10)); + + List> inputs2 = new ArrayList>(); + inputs2.add(new Tuple2("1", "a")); + inputs2.add(new Tuple2("2", "b")); + inputs2.add(new Tuple2("1", "c")); + inputs2.add(new Tuple2("2", "d")); + inputs2.add(new Tuple2("1", "e")); + inputs2.add(new Tuple2("2", "f")); + inputs2.add(new Tuple2("1", "g")); + inputs2.add(new Tuple2("2", "h")); + inputs2.add(new Tuple2("1", "i")); + + List expected = new ArrayList(); + expected.add("10"); + expected.add("19"); + expected.add("12"); + expected.add("33"); + expected.add("19"); + expected.add("ace"); + expected.add("egi"); + expected.add("i"); + expected.add("bdf"); + expected.add("fh"); + + CoGroupedBatchReduceInvokable, Tuple2, String> invokable = new CoGroupedBatchReduceInvokable, Tuple2, String>( + new MyCoReduceFunction(), 4L, 3L, 2L, 2L, 0, 0); List result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e722dfd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java index dd57dfb..b2fc3e5 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java @@ -35,7 +35,7 @@ import org.junit.Test; public class CoGroupedWindowReduceTest { private static class MyCoReduceFunction implements - CoReduceFunction, Tuple2, String> { + CoReduceFunction, Tuple2, String> { private static final long serialVersionUID = 1L; @Override @@ -45,9 +45,9 @@ public class CoGroupedWindowReduceTest { } @Override - public Tuple2 reduce2(Tuple2 value1, - Tuple2 value2) { - return new Tuple2("a", value1.f1 + value2.f1); + public Tuple2 reduce2(Tuple2 value1, + Tuple2 value2) { + return new Tuple2("a", value1.f1 + value2.f1); } @Override @@ -56,8 +56,8 @@ public class CoGroupedWindowReduceTest { } @Override - public String map2(Tuple2 value) { - return value.f1.toString(); + public String map2(Tuple2 value) { + return value.f1; } } @@ -84,51 +84,100 @@ public class CoGroupedWindowReduceTest { } } - List timestamps = Arrays.asList(0L, 1L, 1L, 2L, 2L, 8L, 8L, 10L); + @Test + public void coGroupedWindowReduceTest1() { + + List timestamps1 = Arrays.asList(0L, 0L, 1L, 1L, 1L, 1L, 2L, 4L, 5L, 6L); + List> inputs1 = new ArrayList>(); + inputs1.add(new Tuple2("a", 1)); + inputs1.add(new Tuple2("a", 2)); + inputs1.add(new Tuple2("a", 3)); + inputs1.add(new Tuple2("a", 4)); + inputs1.add(new Tuple2("a", 5)); + inputs1.add(new Tuple2("b", 6)); + inputs1.add(new Tuple2("a", 7)); + inputs1.add(new Tuple2("b", 8)); + inputs1.add(new Tuple2("b", 9)); + inputs1.add(new Tuple2("b", 10)); + + List timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 5L, 5L, 6L, 7L); + List> inputs2 = new ArrayList>(); + inputs2.add(new Tuple2("1", "a")); + inputs2.add(new Tuple2("2", "b")); + inputs2.add(new Tuple2("1", "c")); + inputs2.add(new Tuple2("2", "d")); + inputs2.add(new Tuple2("1", "e")); + inputs2.add(new Tuple2("2", "f")); + inputs2.add(new Tuple2("1", "g")); + inputs2.add(new Tuple2("2", "h")); + inputs2.add(new Tuple2("1", "i")); + + List expected = new ArrayList(); + expected.add("6"); + expected.add("22"); + expected.add("27"); + expected.add("ace"); + expected.add("bd"); + expected.add("g"); + expected.add("fh"); + expected.add("i"); + + CoGroupedWindowReduceInvokable, Tuple2, String> invokable = new CoGroupedWindowReduceInvokable, Tuple2, String>( + new MyCoReduceFunction(), 4L, 3L, 4L, 3L, 0, 0, + new MyTimeStamp>(timestamps1), + new MyTimeStamp>(timestamps2)); + + List result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2); + + Collections.sort(result); + Collections.sort(expected); + assertEquals(expected, result); + } @Test - public void coGroupedWindowReduceTest() { + public void coGroupedWindowReduceTest2() { + List timestamps1 = Arrays.asList(0L, 0L, 1L, 2L, 2L, 3L, 4L, 4L, 5L, 6L); List> inputs1 = new ArrayList>(); inputs1.add(new Tuple2("a", 1)); inputs1.add(new Tuple2("a", 2)); - inputs1.add(new Tuple2("b", 2)); - inputs1.add(new Tuple2("b", 2)); - inputs1.add(new Tuple2("b", 5)); + inputs1.add(new Tuple2("a", 3)); + inputs1.add(new Tuple2("a", 4)); + inputs1.add(new Tuple2("a", 5)); + inputs1.add(new Tuple2("b", 6)); inputs1.add(new Tuple2("a", 7)); + inputs1.add(new Tuple2("b", 8)); inputs1.add(new Tuple2("b", 9)); inputs1.add(new Tuple2("b", 10)); - List> inputs2 = new ArrayList>(); - inputs2.add(new Tuple2("a", 1)); - inputs2.add(new Tuple2("a", 2)); - inputs2.add(new Tuple2("b", 2)); - inputs2.add(new Tuple2("b", 2)); - inputs2.add(new Tuple2("b", 5)); - inputs2.add(new Tuple2("a", 7)); - inputs2.add(new Tuple2("b", 9)); - inputs2.add(new Tuple2("b", 10)); + List timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L, 5L); + List> inputs2 = new ArrayList>(); + inputs2.add(new Tuple2("1", "a")); + inputs2.add(new Tuple2("2", "b")); + inputs2.add(new Tuple2("1", "c")); + inputs2.add(new Tuple2("2", "d")); + inputs2.add(new Tuple2("1", "e")); + inputs2.add(new Tuple2("2", "f")); + inputs2.add(new Tuple2("1", "g")); + inputs2.add(new Tuple2("2", "h")); + inputs2.add(new Tuple2("1", "i")); List expected = new ArrayList(); - expected.add("3"); - expected.add("9"); + expected.add("15"); + expected.add("6"); + expected.add("16"); + expected.add("23"); expected.add("7"); - expected.add("7"); - expected.add("9"); - expected.add("7"); - expected.add("19"); - expected.add("3"); - expected.add("9"); - expected.add("7"); - expected.add("7"); - expected.add("9"); - expected.add("7"); - expected.add("19"); - - CoGroupedWindowReduceInvokable, Tuple2, String> invokable = new CoGroupedWindowReduceInvokable, Tuple2, String>( - new MyCoReduceFunction(), 3L, 3L, 2L, 2L, 0, 0, - new MyTimeStamp>(timestamps), - new MyTimeStamp>(timestamps)); + expected.add("27"); + expected.add("ace"); + expected.add("bdf"); + expected.add("egi"); + expected.add("fh"); + + CoGroupedWindowReduceInvokable, Tuple2, String> invokable = new CoGroupedWindowReduceInvokable, Tuple2, String>( + new MyCoReduceFunction(), 4L, 3L, 2L, 2L, 0, 0, + new MyTimeStamp>(timestamps1), + new MyTimeStamp>(timestamps2)); List result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e722dfd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java index 6e821b4..494182b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java @@ -33,7 +33,7 @@ import org.junit.Test; public class CoWindowReduceTest { - private static class MyCoReduceFunction implements CoReduceFunction { + private static class MyCoReduceFunction implements CoReduceFunction { private static final long serialVersionUID = 1L; @Override @@ -42,7 +42,7 @@ public class CoWindowReduceTest { } @Override - public Integer reduce2(Integer value1, Integer value2) { + public String reduce2(String value1, String value2) { return value1 + value2; } @@ -52,8 +52,8 @@ public class CoWindowReduceTest { } @Override - public String map2(Integer value) { - return value.toString(); + public String map2(String value) { + return value; } } @@ -80,36 +80,85 @@ public class CoWindowReduceTest { } } - List timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 8L, 10L); + @Test + public void coWindowReduceTest1() { + + List inputs = new ArrayList(); + for (Integer i = 1; i <= 10; i++) { + inputs.add(i); + } + + List inputs2 = new ArrayList(); + inputs2.add("a"); + inputs2.add("b"); + inputs2.add("c"); + inputs2.add("d"); + inputs2.add("e"); + inputs2.add("f"); + inputs2.add("g"); + inputs2.add("h"); + inputs2.add("i"); + + List timestamps1 = Arrays.asList(0L, 2L, 3L, 5L, 7L, 9L, 10L, 11L, 11L, 13L); + List timestamps2 = Arrays.asList(0L, 1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L); + + CoWindowReduceInvokable invokable = new CoWindowReduceInvokable( + new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new MyTimeStamp(timestamps1), + new MyTimeStamp(timestamps2)); + + List expected = new ArrayList(); + expected.add("6"); + expected.add("9"); + expected.add("30"); + expected.add("10"); + expected.add("abcde"); + expected.add("fghi"); - List timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L); + List result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2); + + Collections.sort(result); + Collections.sort(expected); + assertEquals(expected, result); + + } @Test - public void coWindowReduceTest() { + public void coWindowReduceTest2() { List inputs = new ArrayList(); for (Integer i = 1; i <= 10; i++) { inputs.add(i); } - List inputs2 = new ArrayList(); - inputs2.add(1); - inputs2.add(2); - inputs2.add(-1); - inputs2.add(-3); - inputs2.add(-4); + List inputs2 = new ArrayList(); + inputs2.add("a"); + inputs2.add("b"); + inputs2.add("c"); + inputs2.add("d"); + inputs2.add("e"); + inputs2.add("f"); + inputs2.add("g"); + inputs2.add("h"); + inputs2.add("i"); - CoWindowReduceInvokable invokable = new CoWindowReduceInvokable( - new MyCoReduceFunction(), 3L, 3L, 2L, 2L, new MyTimeStamp(timestamps1), - new MyTimeStamp(timestamps2)); + List timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 3L, 8L, 10L, 11L); + List timestamps2 = Arrays.asList(1L, 2L, 4L, 5L, 6L, 9L, 10L, 11L, 13L); + + CoWindowReduceInvokable invokable = new CoWindowReduceInvokable( + new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new MyTimeStamp(timestamps1), + new MyTimeStamp(timestamps2)); List expected = new ArrayList(); expected.add("28"); - expected.add("26"); - expected.add("9"); - expected.add("19"); - expected.add("1"); - expected.add("-6"); + expected.add("18"); + expected.add("8"); + expected.add("27"); + expected.add("ab"); + expected.add("cd"); + expected.add("de"); + expected.add("f"); + expected.add("fgh"); + expected.add("hi"); List result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);