flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [22/23] git commit: [FLINK-1110] Adjust collection based runtime and tests for classloaders in runtime context
Date Fri, 03 Oct 2014 16:25:18 GMT
[FLINK-1110] Adjust collection based runtime and tests for classloaders in runtime context


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/65bf092d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/65bf092d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/65bf092d

Branch: refs/heads/master
Commit: 65bf092da77ca0d416a3abbcac21b641cf038101
Parents: ff5ddd5
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Oct 3 16:42:12 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Oct 3 16:42:12 2014 +0200

----------------------------------------------------------------------
 .../api/common/operators/CollectionExecutor.java    | 16 ++++++++++------
 .../base/FlatMapOperatorCollectionTest.java         |  2 +-
 .../common/operators/base/JoinOperatorBaseTest.java |  4 ++--
 .../api/common/operators/base/MapOperatorTest.java  |  4 ++--
 .../operators/base/PartitionMapOperatorTest.java    |  4 ++--
 .../base/CoGroupOperatorCollectionTest.java         |  2 +-
 .../operators/base/GroupReduceOperatorTest.java     |  4 ++--
 .../common/operators/base/JoinOperatorBaseTest.java |  4 ++--
 .../common/operators/base/ReduceOperatorTest.java   |  4 ++--
 9 files changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index d204491..ac8e1f3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -66,6 +66,8 @@ public class CollectionExecutor {
 	
 	private final Map<String, Aggregator<?>> aggregators;
 	
+	private final ClassLoader classLoader;
+	
 	private final boolean mutableObjectSafeMode;
 	
 	// --------------------------------------------------------------------------------------------
@@ -81,6 +83,8 @@ public class CollectionExecutor {
 		this.accumulators = new HashMap<String, Accumulator<?,?>>();
 		this.previousAggregates = new HashMap<String, Value>();
 		this.aggregators = new HashMap<String, Aggregator<?>>();
+		
+		this.classLoader = getClass().getClassLoader();
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -181,8 +185,8 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
{
-			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0) :
-					new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep);
+			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader())
:
+					new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet())
{
 				List<?> bcData = execute(bcInputs.getValue());
@@ -223,8 +227,8 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
{
-			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0) :
-				new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep);
+			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader) :
+				new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet())
{
 				List<?> bcData = execute(bcInputs.getValue());
@@ -478,8 +482,8 @@ public class CollectionExecutor {
 
 		private final int superstep;
 
-		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex,
int superstep) {
-			super(name, numParallelSubtasks, subtaskIndex);
+		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex,
int superstep, ClassLoader classloader) {
+			super(name, numParallelSubtasks, subtaskIndex, classloader);
 			this.superstep = superstep;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index 74dc889..9a0a2b5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -65,7 +65,7 @@ public class FlatMapOperatorCollectionTest implements Serializable {
 	private void testExecuteOnCollection(FlatMapFunction<String, String> udf, List<String>
input, boolean mutableSafe) throws Exception {
 		// run on collections
 		final List<String> result = getTestFlatMapOperator(udf)
-				.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0), mutableSafe);
+				.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null), mutableSafe);
 
 		Assert.assertEquals(input.size(), result.size());
 		Assert.assertEquals(input, result);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index 8834989..0ab8e72 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -110,8 +110,8 @@ public class JoinOperatorBaseTest implements Serializable {
 
 
 		try {
-			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new
RuntimeUDFContext(taskName, 1, 0), true);
-			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2,
new RuntimeUDFContext(taskName, 1, 0), false);
+			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new
RuntimeUDFContext(taskName, 1, 0, null), true);
+			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2,
new RuntimeUDFContext(taskName, 1, 0, null), false);
 
 			assertEquals(expected, resultSafe);
 			assertEquals(expected, resultRegular);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index 82778c5..1a742b6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -97,8 +97,8 @@ public class MapOperatorTest implements java.io.Serializable {
 					parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO), taskName);
 			
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5",
"6"));
-			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0), true);
-			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0), false);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0, null), true);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0, null), false);
 			
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 1c17fde..dadd1ca 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -75,8 +75,8 @@ public class PartitionMapOperatorTest implements java.io.Serializable {
 			
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5",
"6"));
 			
-			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0), true);
-			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0), false);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0, null), true);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0, null), false);
 			
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index 053b8e2..51d4b0e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -66,7 +66,7 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 							.build()
 			);
 
-			final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0);
+			final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null);
 
 			{
 				SumCoGroup udf1 = new SumCoGroup();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index 5d1ca17..cfca5aa 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -155,8 +155,8 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input,
new RuntimeUDFContext(taskName, 1, 0), true);
-			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
new RuntimeUDFContext(taskName, 1, 0), false);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input,
new RuntimeUDFContext(taskName, 1, 0, null), true);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
new RuntimeUDFContext(taskName, 1, 0, null), false);
 			
 			
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String,
Integer>>(resultMutableSafe);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index d9abf14..b4ef54f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -101,8 +101,8 @@ public class JoinOperatorBaseTest implements Serializable {
 		));
 
 		try {
-			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1,
inputData2, new RuntimeUDFContext("op", 1, 0), true);
-			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1,
inputData2, new RuntimeUDFContext("op", 1, 0), false);
+			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1,
inputData2, new RuntimeUDFContext("op", 1, 0, null), true);
+			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1,
inputData2, new RuntimeUDFContext("op", 1, 0, null), false);
 
 			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultSafe));
 			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultRegular));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 2baf57e..90bbe41 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -132,8 +132,8 @@ public class ReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input,
new RuntimeUDFContext(taskName, 1, 0), true);
-			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
new RuntimeUDFContext(taskName, 1, 0), false);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input,
new RuntimeUDFContext(taskName, 1, 0, null), true);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
new RuntimeUDFContext(taskName, 1, 0, null), false);
 
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String,
Integer>>(resultMutableSafe);
 			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String,
Integer>>(resultRegular);


Mime
View raw message