Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A9966119D6 for ; Fri, 29 Aug 2014 19:04:03 +0000 (UTC) Received: (qmail 47577 invoked by uid 500); 29 Aug 2014 19:04:03 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 47555 invoked by uid 500); 29 Aug 2014 19:04:03 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 47546 invoked by uid 99); 29 Aug 2014 19:04:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Aug 2014 19:04:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 29 Aug 2014 19:03:55 +0000 Received: (qmail 43871 invoked by uid 99); 29 Aug 2014 19:03:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Aug 2014 19:03:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2B6DC9A88A9; Fri, 29 Aug 2014 19:03:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Fri, 29 Aug 2014 19:03:54 -0000 Message-Id: <4eb358c3a8ec443580b9b75b465b9563@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [21/28] git commit: [streaming] Tests added for Reduce, GroupReduce and BatchGroupReduce invokables X-Virus-Checked: Checked by ClamAV on apache.org [streaming] Tests added for Reduce, GroupReduce and BatchGroupReduce invokables Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d5d97068 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d5d97068 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d5d97068 Branch: refs/heads/master Commit: d5d970683e4f40016efba1b44f705b1f8e86c1db Parents: 4207c5f Author: gyfora Authored: Wed Aug 27 16:32:39 2014 +0200 Committer: Stephan Ewen Committed: Fri Aug 29 21:01:57 2014 +0200 ---------------------------------------------------------------------- .../operator/BatchGroupReduceTest.java | 96 ++++++++++++++++++++ .../operator/GroupReduceInvokableTest.java | 54 +++++++++++ .../invokable/operator/StreamReduceTest.java | 54 +++++++++++ .../flink/streaming/util/MockInvokable.java | 1 + 4 files changed, 205 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d97068/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java new file mode 100755 index 0000000..bd6bfba --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class BatchGroupReduceTest { + + public static final class MySlidingBatchReduce1 implements GroupReduceFunction { + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterable values, Collector out) throws Exception { + for (Integer value : values) { + out.collect(value.toString()); + } + out.collect(END_OF_GROUP); + } + } + + public static final class MySlidingBatchReduce2 extends + RichGroupReduceFunction, String> { + private static final long serialVersionUID = 1L; + + String openString; + + @Override + public void reduce(Iterable> values, Collector out) + throws Exception { + out.collect(openString); + for (Tuple2 value : values) { + out.collect(value.f0.toString()); + } + out.collect(END_OF_GROUP); + } + + @Override + public void open(Configuration c){ + openString = "open"; + } + } + + private final static String END_OF_GROUP = "end of group"; + + @SuppressWarnings("unchecked") + @Test + public void slidingBatchGroupReduceTest() { + BatchGroupReduceInvokable invokable1 = new BatchGroupReduceInvokable( + new MySlidingBatchReduce1(), 3, 2, 0); + + List expected = Arrays.asList("1", "1", END_OF_GROUP, "2", END_OF_GROUP, "2", + END_OF_GROUP, "3", "3", END_OF_GROUP); + List actual = MockInvokable.createAndExecute(invokable1, + Arrays.asList(1, 1, 2, 3, 3)); + + assertEquals(expected, actual); + + BatchGroupReduceInvokable, String> invokable2 = new BatchGroupReduceInvokable, String>( + new MySlidingBatchReduce2(), 2, 2, 1); + + expected = Arrays.asList("open","1", "2", END_OF_GROUP,"open", "3", END_OF_GROUP,"open", "4", END_OF_GROUP); + actual = MockInvokable.createAndExecute(invokable2, Arrays.asList( + new Tuple2(1, "a"), new Tuple2(2, "a"), + new Tuple2(3, "b"), new Tuple2(4, "a"))); + + assertEquals(expected, actual); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d97068/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokableTest.java new file mode 100755 index 0000000..ae28034 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokableTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.util.MockInvokable; +import org.junit.Test; + +public class GroupReduceInvokableTest { + + private static class MyReducer implements ReduceFunction{ + + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1+value2; + } + + } + + @Test + public void test() { + GroupReduceInvokable invokable1 = new GroupReduceInvokable( + new MyReducer(),0); + + List expected = Arrays.asList(1,2,2,4,3); + List actual = MockInvokable.createAndExecute(invokable1, + Arrays.asList(1, 1, 2, 2, 3)); + + assertEquals(expected, actual); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d97068/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java new file mode 100755 index 0000000..994cbb0 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.util.MockInvokable; +import org.junit.Test; + +public class StreamReduceTest { + + private static class MyReducer implements ReduceFunction{ + + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1+value2; + } + + } + + @Test + public void test() { + StreamReduceInvokable invokable1 = new StreamReduceInvokable( + new MyReducer()); + + List expected = Arrays.asList(1,2,4,7,10); + List actual = MockInvokable.createAndExecute(invokable1, + Arrays.asList(1, 1, 2, 3, 3)); + + assertEquals(expected, actual); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d97068/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java index 1ea78e1..dd8b029 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java @@ -94,6 +94,7 @@ public class MockInvokable { try { invokable.open(null); invokable.invoke(); + invokable.close(); } catch (Exception e) { throw new RuntimeException("Cannot invoke invokable.", e); }