flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject flink git commit: [FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections
Date Thu, 02 Mar 2017 21:37:37 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 ba5aa10b9 -> 01703e60e


[FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections

Conclude OuterJoinOperatorBase#executeOnCollections with a call to
FunctionUtils.closeFunction(function) in order to close rich user
functions.

This closes #3453


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01703e60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01703e60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01703e60

Branch: refs/heads/release-1.1
Commit: 01703e60e0b583d6d32c2cba395f6199c5773c5e
Parents: ba5aa10
Author: Greg Hogan <code@greghogan.com>
Authored: Wed Mar 1 15:55:48 2017 -0500
Committer: Greg Hogan <code@greghogan.com>
Committed: Thu Mar 2 09:42:22 2017 -0500

----------------------------------------------------------------------
 .../operators/base/OuterJoinOperatorBase.java   |  3 +-
 .../base/OuterJoinOperatorBaseTest.java         | 88 ++++++++++++++++----
 .../graph/library/link_analysis/HITSTest.java   |  2 +-
 3 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01703e60/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
index a47a612..003fdca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -103,7 +103,6 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN
 		FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
 		FunctionUtils.openFunction(function, this.parameters);
 
-
 		List<OUT> result = new ArrayList<>();
 		Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig));
 
@@ -113,6 +112,8 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN
 			function.join(left == null ? null : leftSerializer.copy(left), right == null ? null :
rightSerializer.copy(right), collector);
 		}
 
+		FunctionUtils.closeFunction(function);
+
 		return result;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01703e60/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
index 69159f2..143a416 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -18,34 +18,63 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class OuterJoinOperatorBaseTest implements Serializable {
 
-	private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String,
String, String>() {
-		@Override
-		public void join(String first, String second, Collector<String> out) throws Exception
{
-			out.collect(String.valueOf(first) + ',' + String.valueOf(second));
-		}
-	};
+	private MockRichFlatJoinFunction joiner;
+
+	private OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String,
String>> baseOperator;
+
+	private ExecutionConfig executionConfig;
+
+	private RuntimeContext runtimeContext;
 
 	@SuppressWarnings({"rawtypes", "unchecked"})
-	private final OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String,
String, String>> baseOperator =
+	@Before
+	public void setup() {
+		joiner = new MockRichFlatJoinFunction();
+
+		baseOperator =
 			new OuterJoinOperatorBase(joiner,
-					new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
-							BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+				new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+
+		executionConfig = new ExecutionConfig();
+
+		String taskName = "Test rich outer join function";
+		TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0);
+		HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<>();
+		HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
+		runtimeContext = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks,
+			accumulatorMap, new UnregisteredMetricsGroup());
+	}
 
 	@Test
 	public void testFullOuterJoinWithoutMatchingPartners() throws Exception {
@@ -131,18 +160,41 @@ public class OuterJoinOperatorBaseTest implements Serializable {
 		baseOperator.setOuterJoinType(null);
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.disableObjectReuse();
-		baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+		baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig);
 	}
 
 	private void testOuterJoin(List<String> leftInput, List<String> rightInput,
List<String> expected) throws Exception {
-		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.disableObjectReuse();
-		List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput,
null, executionConfig);
+		List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput,
runtimeContext, executionConfig);
 		executionConfig.enableObjectReuse();
-		List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput,
null, executionConfig);
+		List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput,
runtimeContext, executionConfig);
 
 		assertEquals(expected, resultSafe);
 		assertEquals(expected, resultRegular);
+
+		assertTrue(joiner.opened.get());
+		assertTrue(joiner.closed.get());
 	}
 
-}
\ No newline at end of file
+	private static class MockRichFlatJoinFunction extends RichFlatJoinFunction<String, String,
String> {
+		final AtomicBoolean opened = new AtomicBoolean(false);
+		final AtomicBoolean closed = new AtomicBoolean(false);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			opened.compareAndSet(false, true);
+			assertEquals(0, getRuntimeContext().getIndexOfThisSubtask());
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void close() throws Exception{
+			closed.compareAndSet(false, true);
+		}
+
+		@Override
+		public void join(String first, String second, Collector<String> out) throws Exception
{
+			out.collect(String.valueOf(first) + ',' + String.valueOf(second));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01703e60/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index 1551d84..590fc0e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -80,7 +80,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new HITS<LongValue, NullValue, NullValue>(0.000001)));
+			.run(new HITS<LongValue, NullValue, NullValue>(1)));
 
 		assertEquals(902, checksum.getCount());
 		assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum());


Mime
View raw message