Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 97F54200C2B for ; Thu, 2 Mar 2017 21:47:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 96719160B6F; Thu, 2 Mar 2017 20:47:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BADAE160B6A for ; Thu, 2 Mar 2017 21:47:39 +0100 (CET) Received: (qmail 52043 invoked by uid 500); 2 Mar 2017 20:47:39 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 52034 invoked by uid 99); 2 Mar 2017 20:47:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Mar 2017 20:47:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C1385DFDAE; Thu, 2 Mar 2017 20:47:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greg@apache.org To: commits@flink.apache.org Message-Id: <41dc659fdb7d46bf99c3c93cdc0159a6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections Date: Thu, 2 Mar 2017 20:47:38 +0000 (UTC) archived-at: Thu, 02 Mar 2017 20:47:40 -0000 Repository: flink Updated Branches: refs/heads/release-1.2 c22efce09 -> 54a02d9a4 [FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections Conclude OuterJoinOperatorBase#executeOnCollections with a call to FunctionUtils.closeFunction(function) in order to close rich user functions. This closes #3453 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54a02d9a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54a02d9a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54a02d9a Branch: refs/heads/release-1.2 Commit: 54a02d9a4b81aeb462f958bdeda0aaa509357677 Parents: c22efce Author: Greg Hogan Authored: Wed Mar 1 15:55:48 2017 -0500 Committer: Greg Hogan Committed: Thu Mar 2 09:37:34 2017 -0500 ---------------------------------------------------------------------- .../operators/base/OuterJoinOperatorBase.java | 3 +- .../base/OuterJoinOperatorBaseTest.java | 80 ++++++++++++++++---- .../graph/library/link_analysis/HITSTest.java | 2 +- 3 files changed, 69 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/54a02d9a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java index a47a612..003fdca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java @@ -103,7 +103,6 @@ public class OuterJoinOperatorBase result = new ArrayList<>(); Collector collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig)); @@ -113,6 +112,8 @@ public class OuterJoinOperatorBase joiner = new FlatJoinFunction() { - @Override - public void join(String first, String second, Collector out) throws Exception { - out.collect(String.valueOf(first) + ',' + String.valueOf(second)); - } - }; + private MockRichFlatJoinFunction joiner; + + private OuterJoinOperatorBase> baseOperator; + + private ExecutionConfig executionConfig; + + private RuntimeContext runtimeContext; @SuppressWarnings({"rawtypes", "unchecked"}) - private final OuterJoinOperatorBase> baseOperator = + @Before + public void setup() { + joiner = new MockRichFlatJoinFunction(); + + baseOperator = new OuterJoinOperatorBase(joiner, - new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null); + new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null); + + executionConfig = new ExecutionConfig(); + + String taskName = "Test rich outer join function"; + TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0); + HashMap> accumulatorMap = new HashMap<>(); + HashMap> cpTasks = new HashMap<>(); + + runtimeContext = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, + accumulatorMap, new UnregisteredMetricsGroup()); + } @Test public void testFullOuterJoinWithoutMatchingPartners() throws Exception { @@ -132,18 +161,41 @@ public class OuterJoinOperatorBaseTest implements Serializable { baseOperator.setOuterJoinType(null); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); } private void testOuterJoin(List leftInput, List rightInput, List expected) throws Exception { - ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + List resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + List resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); + + assertTrue(joiner.opened.get()); + assertTrue(joiner.closed.get()); } -} \ No newline at end of file + private static class MockRichFlatJoinFunction extends RichFlatJoinFunction { + final AtomicBoolean opened = new AtomicBoolean(false); + final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public void open(Configuration parameters) throws Exception { + opened.compareAndSet(false, true); + assertEquals(0, getRuntimeContext().getIndexOfThisSubtask()); + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void close() throws Exception{ + closed.compareAndSet(false, true); + } + + @Override + public void join(String first, String second, Collector out) throws Exception { + out.collect(String.valueOf(first) + ',' + String.valueOf(second)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54a02d9a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java index 1551d84..590fc0e 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java @@ -80,7 +80,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new HITS(0.000001))); + .run(new HITS(1))); assertEquals(902, checksum.getCount()); assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum());