Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E729B1759A for ; Fri, 3 Oct 2014 16:25:40 +0000 (UTC) Received: (qmail 56361 invoked by uid 500); 3 Oct 2014 16:25:40 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 56339 invoked by uid 500); 3 Oct 2014 16:25:40 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 56330 invoked by uid 99); 3 Oct 2014 16:25:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Oct 2014 16:25:40 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 03 Oct 2014 16:25:00 +0000 Received: (qmail 52426 invoked by uid 99); 3 Oct 2014 16:24:58 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Oct 2014 16:24:58 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 73699A1F6E8; Fri, 3 Oct 2014 16:24:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Fri, 03 Oct 2014 16:25:18 -0000 Message-Id: In-Reply-To: <3d730bb465384bccbb11d3dc4889e04f@git.apache.org> References: <3d730bb465384bccbb11d3dc4889e04f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/23] git commit: [FLINK-1110] Adjust collection based runtime and tests for classloaders in runtime context X-Virus-Checked: Checked by ClamAV on apache.org [FLINK-1110] Adjust collection based runtime and tests for classloaders in runtime context Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/65bf092d Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/65bf092d Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/65bf092d Branch: refs/heads/master Commit: 65bf092da77ca0d416a3abbcac21b641cf038101 Parents: ff5ddd5 Author: Stephan Ewen Authored: Fri Oct 3 16:42:12 2014 +0200 Committer: Stephan Ewen Committed: Fri Oct 3 16:42:12 2014 +0200 ---------------------------------------------------------------------- .../api/common/operators/CollectionExecutor.java | 16 ++++++++++------ .../base/FlatMapOperatorCollectionTest.java | 2 +- .../common/operators/base/JoinOperatorBaseTest.java | 4 ++-- .../api/common/operators/base/MapOperatorTest.java | 4 ++-- .../operators/base/PartitionMapOperatorTest.java | 4 ++-- .../base/CoGroupOperatorCollectionTest.java | 2 +- .../operators/base/GroupReduceOperatorTest.java | 4 ++-- .../common/operators/base/JoinOperatorBaseTest.java | 4 ++-- .../common/operators/base/ReduceOperatorTest.java | 4 ++-- 9 files changed, 24 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index d204491..ac8e1f3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -66,6 +66,8 @@ public class CollectionExecutor { private final Map> aggregators; + private final ClassLoader classLoader; + private final boolean mutableObjectSafeMode; // -------------------------------------------------------------------------------------------- @@ -81,6 +83,8 @@ public class CollectionExecutor { this.accumulators = new HashMap>(); this.previousAggregates = new HashMap(); this.aggregators = new HashMap>(); + + this.classLoader = getClass().getClassLoader(); } // -------------------------------------------------------------------------------------------- @@ -181,8 +185,8 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader()) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -223,8 +227,8 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -478,8 +482,8 @@ public class CollectionExecutor { private final int superstep; - public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep) { - super(name, numParallelSubtasks, subtaskIndex); + public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader) { + super(name, numParallelSubtasks, subtaskIndex, classloader); this.superstep = superstep; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 74dc889..9a0a2b5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -65,7 +65,7 @@ public class FlatMapOperatorCollectionTest implements Serializable { private void testExecuteOnCollection(FlatMapFunction udf, List input, boolean mutableSafe) throws Exception { // run on collections final List result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0), mutableSafe); + .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null), mutableSafe); Assert.assertEquals(input.size(), result.size()); Assert.assertEquals(input, result); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index 8834989..0ab8e72 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -110,8 +110,8 @@ public class JoinOperatorBaseTest implements Serializable { try { - List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), true); - List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), false); + List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null), true); + List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null), false); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index 82778c5..1a742b6 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -97,8 +97,8 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 1c17fde..dadd1ca 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -75,8 +75,8 @@ public class PartitionMapOperatorTest implements java.io.Serializable { List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index 053b8e2..51d4b0e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -66,7 +66,7 @@ public class CoGroupOperatorCollectionTest implements Serializable { .build() ); - final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0); + final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null); { SumCoGroup udf1 = new SumCoGroup(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index 5d1ca17..cfca5aa 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -155,8 +155,8 @@ public class GroupReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index d9abf14..b4ef54f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -101,8 +101,8 @@ public class JoinOperatorBaseTest implements Serializable { )); try { - List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), true); - List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), false); + List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null), true); + List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null), false); assertEquals(expected, new HashSet>(resultSafe)); assertEquals(expected, new HashSet>(resultRegular)); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/65bf092d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index 2baf57e..90bbe41 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -132,8 +132,8 @@ public class ReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); Set> resultSetRegular = new HashSet>(resultRegular);