Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AF957175AC for ; Fri, 3 Oct 2014 16:25:50 +0000 (UTC) Received: (qmail 56966 invoked by uid 500); 3 Oct 2014 16:25:50 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 56941 invoked by uid 500); 3 Oct 2014 16:25:50 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 56932 invoked by uid 99); 3 Oct 2014 16:25:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Oct 2014 16:25:50 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 03 Oct 2014 16:25:02 +0000 Received: (qmail 52429 invoked by uid 99); 3 Oct 2014 16:24:58 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Oct 2014 16:24:58 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 64924A1F6E4; Fri, 3 Oct 2014 16:24:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Fri, 03 Oct 2014 16:25:16 -0000 Message-Id: In-Reply-To: <3d730bb465384bccbb11d3dc4889e04f@git.apache.org> References: <3d730bb465384bccbb11d3dc4889e04f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [20/23] git commit: [FLINK-1110] By default, collection-based execution behaves mutable-object safe. X-Virus-Checked: Checked by ClamAV on apache.org [FLINK-1110] By default, collection-based execution behaves mutable-object safe. Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ac69cb3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ac69cb3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ac69cb3e Branch: refs/heads/master Commit: ac69cb3ef1f3ddc5b838d11dba06a05255a543e0 Parents: 3fd3110 Author: Stephan Ewen Authored: Tue Sep 30 21:32:54 2014 +0200 Committer: Stephan Ewen Committed: Fri Oct 3 16:22:34 2014 +0200 ---------------------------------------------------------------------- .../flink/compiler/util/NoOpBinaryUdfOp.java | 2 +- .../flink/compiler/util/NoOpUnaryUdfOp.java | 2 +- .../common/functions/util/CopyingIterator.java | 63 ++++++++++ .../functions/util/CopyingListCollector.java | 43 +++++++ .../common/operators/CollectionExecutor.java | 13 ++- .../api/common/operators/DualInputOperator.java | 2 +- .../common/operators/SingleInputOperator.java | 2 +- .../flink/api/common/operators/Union.java | 2 +- .../operators/base/BulkIterationBase.java | 2 +- .../operators/base/CoGroupOperatorBase.java | 25 ++-- .../base/CollectorMapOperatorBase.java | 2 +- .../operators/base/CrossOperatorBase.java | 27 ++++- .../operators/base/DeltaIterationBase.java | 3 +- .../common/operators/base/FileDataSinkBase.java | 5 +- .../operators/base/FilterOperatorBase.java | 2 +- .../operators/base/FlatMapOperatorBase.java | 26 ++++- .../operators/base/GroupReduceOperatorBase.java | 42 ++++--- .../common/operators/base/JoinOperatorBase.java | 41 ++++--- .../common/operators/base/MapOperatorBase.java | 21 +++- .../base/MapPartitionOperatorBase.java | 21 +++- .../operators/base/PartitionOperatorBase.java | 7 ++ .../operators/base/ReduceOperatorBase.java | 47 +++++--- .../operators/util/ListKeyGroupedIterator.java | 22 ++-- .../base/FlatMapOperatorCollectionTest.java | 33 ++++-- .../operators/base/JoinOperatorBaseTest.java | 19 +-- .../common/operators/base/MapOperatorTest.java | 12 +- .../base/PartitionMapOperatorTest.java | 7 +- .../CollectionExecutionAccumulatorsTest.java | 2 +- .../CollectionExecutionIterationTest.java | 6 +- ...ctionExecutionWithBroadcastVariableTest.java | 2 +- .../base/CoGroupOperatorCollectionTest.java | 37 ++++-- .../operators/base/GroupReduceOperatorTest.java | 28 +++-- .../operators/base/JoinOperatorBaseTest.java | 115 ++++++++++++++++++ .../operators/base/ReduceOperatorTest.java | 28 +++-- .../operators/base/JoinOperatorBaseTest.java | 116 ------------------- 35 files changed, 550 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java index 1abaeac..4b48ec7 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java @@ -43,7 +43,7 @@ public class NoOpBinaryUdfOp extends DualInputOperator executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutables) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java index 8f88168..474d3a4 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java @@ -54,7 +54,7 @@ public class NoOpUnaryUdfOp extends SingleInputOperator executeOnCollections(List inputData, RuntimeContext runtimeContext) { + protected List executeOnCollections(List inputData, RuntimeContext runtimeContext, boolean mutables) { return inputData; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java new file mode 100644 index 0000000..b7a8dc9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.util.Iterator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.TraversableOnceException; + +public class CopyingIterator implements Iterator, Iterable { + + private final Iterator source; + private final TypeSerializer serializer; + + private boolean available = true; + + public CopyingIterator(Iterator source, TypeSerializer serializer) { + this.source = source; + this.serializer = serializer; + } + + @Override + public Iterator iterator() { + if (available) { + available = false; + return this; + } else { + throw new TraversableOnceException(); + } + } + + @Override + public boolean hasNext() { + return source.hasNext(); + } + + @Override + public E next() { + E next = source.next(); + return serializer.copy(next); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java new file mode 100644 index 0000000..8620981 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.util.List; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; + +public class CopyingListCollector implements Collector { + + private final List list; + private final TypeSerializer serializer; + + public CopyingListCollector(List list, TypeSerializer serializer) { + this.list = list; + this.serializer = serializer; + } + + @Override + public void collect(T record) { + list.add(serializer.copy(record)); + } + + @Override + public void close() {} +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index a748a5e..5d30df4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -52,13 +52,22 @@ import org.apache.flink.util.Visitor; */ public class CollectionExecutor { + private static final boolean DEFAULT_MUTABLE_OBJECT_SAFE_MODE = true; + private final Map, List> intermediateResults; private final Map> accumulators; + private final boolean mutableObjectSafeMode; + // -------------------------------------------------------------------------------------------- public CollectionExecutor() { + this(DEFAULT_MUTABLE_OBJECT_SAFE_MODE); + } + + public CollectionExecutor(boolean mutableObjectSafeMode) { + this.mutableObjectSafeMode = mutableObjectSafeMode; this.intermediateResults = new HashMap, List>(); this.accumulators = new HashMap>(); } @@ -172,7 +181,7 @@ public class CollectionExecutor { ctx = null; } - List result = typedOp.executeOnCollections(inputData, ctx); + List result = typedOp.executeOnCollections(inputData, ctx, mutableObjectSafeMode); if (ctx != null) { AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators()); @@ -214,7 +223,7 @@ public class CollectionExecutor { ctx = null; } - List result = typedOp.executeOnCollections(inputData1, inputData2, ctx); + List result = typedOp.executeOnCollections(inputData1, inputData2, ctx, mutableObjectSafeMode); if (ctx != null) { AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java index 2d28180..6325788 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java @@ -286,5 +286,5 @@ public abstract class DualInputOperator exte // -------------------------------------------------------------------------------------------- - protected abstract List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) throws Exception; + protected abstract List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java index fa923e7..f1bf2ad 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java @@ -209,5 +209,5 @@ public abstract class SingleInputOperator extends // -------------------------------------------------------------------------------------------- - protected abstract List executeOnCollections(List inputData, RuntimeContext runtimeContext) throws Exception; + protected abstract List executeOnCollections(List inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java index f4b3c51..fb8626d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java @@ -48,7 +48,7 @@ public class Union extends DualInputOperator { } @Override - protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { ArrayList result = new ArrayList(inputData1.size() + inputData2.size()); result.addAll(inputData1); result.addAll(inputData2); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java index 31b5191..4fbf65e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java @@ -303,7 +303,7 @@ public class BulkIterationBase extends SingleInputOperator executeOnCollections(List inputData, RuntimeContext runtimeContext) { + protected List executeOnCollections(List inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java index 4b6d639..6aa3da0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.BinaryOperatorInformation; @@ -35,6 +36,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.GenericPairComparator; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.util.Collector; import java.io.IOException; @@ -177,7 +179,7 @@ public class CoGroupOperatorBase executeOnCollections(List input1, List input2, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List input1, List input2, RuntimeContext ctx, boolean mutableObjectSafe) throws Exception { // -------------------------------------------------------------------- // Setup // -------------------------------------------------------------------- @@ -193,11 +195,15 @@ public class CoGroupOperatorBase inputSerializer1 = inputType1.createSerializer(); + final TypeSerializer inputSerializer2 = inputType2.createSerializer(); + final TypeComparator inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputSortDirections1); final TypeComparator inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputSortDirections2); CoGroupSortListIterator coGroupIterator = - new CoGroupSortListIterator(input1, inputComparator1, input2, inputComparator2); + new CoGroupSortListIterator(input1, inputComparator1, inputSerializer1, + input2, inputComparator2, inputSerializer2, mutableObjectSafe); // -------------------------------------------------------------------- // Run UDF @@ -208,7 +214,9 @@ public class CoGroupOperatorBase result = new ArrayList(); - Collector resultCollector = new ListCollector(result); + Collector resultCollector = mutableObjectSafe ? + new CopyingListCollector(result, getOperatorInfo().getOutputType().createSerializer()) : + new ListCollector(result); while (coGroupIterator.next()) { function.coGroup(coGroupIterator.getValues1(), coGroupIterator.getValues2(), resultCollector); @@ -247,13 +255,14 @@ public class CoGroupOperatorBase secondReturn; private CoGroupSortListIterator( - List input1, final TypeComparator inputComparator1, - List input2, final TypeComparator inputComparator2) { - + List input1, final TypeComparator inputComparator1, TypeSerializer serializer1, + List input2, final TypeComparator inputComparator2, TypeSerializer serializer2, + boolean copyElements) + { this.pairComparator = new GenericPairComparator(inputComparator1, inputComparator2); - this.iterator1 = new ListKeyGroupedIterator(input1, inputComparator1); - this.iterator2 = new ListKeyGroupedIterator(input2, inputComparator2); + this.iterator1 = new ListKeyGroupedIterator(input1, serializer1, inputComparator1, copyElements); + this.iterator2 = new ListKeyGroupedIterator(input2, serializer2, inputComparator2, copyElements); // ---------------------------------------------------------------- // Sort http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java index c3e00bc..8ad91c6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java @@ -52,7 +52,7 @@ public class CollectorMapOperatorBase executeOnCollections(List inputData, RuntimeContext ctx) { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java index c70460a..8f49518 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeutils.TypeSerializer; /** * @see org.apache.flink.api.common.functions.CrossFunction @@ -50,21 +51,37 @@ public class CrossOperatorBase executeOnCollections(List inputData1, List inputData2, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { CrossFunction function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); ArrayList result = new ArrayList(inputData1.size() * inputData2.size()); - for (IN1 element1 : inputData1) { - for (IN2 element2 : inputData2) { - result.add(function.cross(element1, element2)); + + if (mutableObjectSafeMode) { + TypeSerializer inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(); + TypeSerializer inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(); + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + + for (IN1 element1 : inputData1) { + for (IN2 element2 : inputData2) { + IN1 copy1 = inSerializer1.copy(element1); + IN2 copy2 = inSerializer2.copy(element2); + OUT o = function.cross(copy1, copy2); + result.add(outSerializer.copy(o)); + } + } + } + else { + for (IN1 element1 : inputData1) { + for (IN2 element2 : inputData2) { + result.add(function.cross(element1, element2)); + } } } FunctionUtils.closeFunction(function); - return result; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java index fddf6e7..f945b1d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import java.util.Collections; @@ -333,7 +332,7 @@ public class DeltaIterationBase extends DualInputOperator executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) throws Exception { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java index a36d44b..8f5c4e3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.io.FileOutputFormat; @@ -72,8 +71,7 @@ public class FileDataSinkBase extends GenericDataSinkBase { * * @return The path to which the output shall be written. */ - public String getFilePath() - { + public String getFilePath() { return this.filePath; } @@ -82,5 +80,4 @@ public class FileDataSinkBase extends GenericDataSinkBase { public String toString() { return this.filePath; } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java index 39f8984..7f7add2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java @@ -50,7 +50,7 @@ public class FilterOperatorBase> extends Sin } @Override - protected List executeOnCollections(List inputData, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { FlatMapFunction function = this.userFunction.getUserCodeObject(); FunctionUtils.openFunction(function, this.parameters); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java index 81f3bcf..8312a99 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.SingleInputOperator; @@ -27,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.util.ArrayList; import java.util.List; @@ -51,17 +53,29 @@ public class FlatMapOperatorBase> e // ------------------------------------------------------------------------ @Override - protected List executeOnCollections(List input, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List input, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { FlatMapFunction function = userFunction.getUserCodeObject(); - + FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, parameters); ArrayList result = new ArrayList(input.size()); - ListCollector resultCollector = new ListCollector(result); - - for (IN element : input) { - function.flatMap(element, resultCollector); + + if (mutableObjectSafeMode) { + TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(); + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + + CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer); + + for (IN element : input) { + IN inCopy = inSerializer.copy(element); + function.flatMap(inCopy, resultCollector); + } + } else { + ListCollector resultCollector = new ListCollector(result); + for (IN element : input) { + function.flatMap(element, resultCollector); + } } FunctionUtils.closeFunction(function); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java index 6abd7b5..5d3b92d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java @@ -18,11 +18,11 @@ package org.apache.flink.api.common.operators.base; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.Ordering; @@ -35,6 +35,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeinfo.CompositeType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.util.ArrayList; import java.util.Collections; @@ -131,20 +132,21 @@ public class GroupReduceOperatorBase executeOnCollections(List inputData, RuntimeContext ctx) - throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { GroupReduceFunction function = this.userFunction.getUserCodeObject(); UnaryOperatorInformation operatorInfo = getOperatorInfo(); TypeInformation inputType = operatorInfo.getInputType(); if (!(inputType instanceof CompositeType)) { - throw new InvalidProgramException("Input type of groupReduce operation must be" + - " composite type."); + throw new InvalidProgramException("Input type of groupReduce operation must be a composite type."); } int[] inputColumns = getKeyColumns(0); boolean[] inputOrderings = new boolean[inputColumns.length]; + + final TypeSerializer inputSerializer = inputType.createSerializer(); + @SuppressWarnings("unchecked") final TypeComparator inputComparator = ((CompositeType) inputType).createComparator(inputColumns, inputOrderings); @@ -152,26 +154,34 @@ public class GroupReduceOperatorBase result = new ArrayList(inputData.size()); - ListCollector collector = new ListCollector(result); - Collections.sort(inputData, new Comparator() { @Override public int compare(IN o1, IN o2) { return inputComparator.compare(o2, o1); } }); - ListKeyGroupedIterator keyedIterator = - new ListKeyGroupedIterator(inputData, inputComparator); - - while (keyedIterator.nextKey()) { - function.reduce(keyedIterator.getValues(), collector); + + ListKeyGroupedIterator keyedIterator = new ListKeyGroupedIterator( + inputData, inputSerializer, inputComparator, mutableObjectSafeMode); + + ArrayList result = new ArrayList(); + + if (mutableObjectSafeMode) { + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + CopyingListCollector collector = new CopyingListCollector(result, outSerializer); + + while (keyedIterator.nextKey()) { + function.reduce(keyedIterator.getValues(), collector); + } + } + else { + ListCollector collector = new ListCollector(result); + while (keyedIterator.nextKey()) { + function.reduce(keyedIterator.getValues(), collector); + } } FunctionUtils.closeFunction(function); - return result; } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java index 34ede65..3d5cf72 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java @@ -16,11 +16,11 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.BinaryOperatorInformation; @@ -34,6 +34,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.GenericPairComparator; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Arrays; @@ -60,7 +62,7 @@ public class JoinOperatorBase executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) throws Exception { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafe) throws Exception { FlatJoinFunction function = userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, runtimeContext); @@ -68,13 +70,18 @@ public class JoinOperatorBase leftInformation = getOperatorInfo().getFirstInputType(); TypeInformation rightInformation = getOperatorInfo().getSecondInputType(); - + TypeInformation outInformation = getOperatorInfo().getOutputType(); + + TypeSerializer leftSerializer = mutableObjectSafe ? leftInformation.createSerializer() : null; + TypeSerializer rightSerializer = mutableObjectSafe ? rightInformation.createSerializer() : null; + TypeComparator leftComparator; TypeComparator rightComparator; - if(leftInformation instanceof AtomicType){ + if (leftInformation instanceof AtomicType){ leftComparator = ((AtomicType) leftInformation).createComparator(true); - }else if(leftInformation instanceof CompositeType){ + } + else if(leftInformation instanceof CompositeType){ int[] keyPositions = getKeyColumns(0); boolean[] orders = new boolean[keyPositions.length]; Arrays.fill(orders, true); @@ -102,12 +109,13 @@ public class JoinOperatorBase result = new ArrayList(); - ListCollector collector = new ListCollector(result); + Collector collector = mutableObjectSafe ? new CopyingListCollector(result, outInformation.createSerializer()) + : new ListCollector(result); Map> probeTable = new HashMap>(); - //Build probe table - for(IN2 element: inputData2){ + //Build hash table + for (IN2 element: inputData2){ List list = probeTable.get(rightComparator.hash(element)); if(list == null){ list = new ArrayList(); @@ -118,15 +126,18 @@ public class JoinOperatorBase matchingHashes = probeTable.get(leftComparator.hash(left)); - pairComparator.setReference(left); - - if(matchingHashes != null){ - for(IN2 right: matchingHashes){ - if(pairComparator.equalToReference(right)){ - function.join(left, right, collector); + if (matchingHashes != null) { + pairComparator.setReference(left); + for (IN2 right : matchingHashes){ + if (pairComparator.equalToReference(right)) { + if (mutableObjectSafe) { + function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector); + } else { + function.join(left, right, collector); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java index d545676..0218bfa 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import java.util.ArrayList; @@ -30,7 +29,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; - +import org.apache.flink.api.common.typeutils.TypeSerializer; /** * @@ -55,15 +54,27 @@ public class MapOperatorBase> extends S // -------------------------------------------------------------------------------------------- @Override - protected List executeOnCollections(List inputData, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { MapFunction function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); ArrayList result = new ArrayList(inputData.size()); - for (IN element : inputData) { - result.add(function.map(element)); + + if (mutableObjectSafeMode) { + TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(); + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + + for (IN element : inputData) { + IN inCopy = inSerializer.copy(element); + OUT out = function.map(inCopy); + result.add(outSerializer.copy(out)); + } + } else { + for (IN element : inputData) { + result.add(function.map(element)); + } } FunctionUtils.closeFunction(function); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java index 21fa9be..7c1fcef 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java @@ -23,6 +23,8 @@ import java.util.List; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingIterator; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.SingleInputOperator; @@ -30,6 +32,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeutils.TypeSerializer; /** * @@ -54,18 +57,28 @@ public class MapPartitionOperatorBase executeOnCollections(List inputData, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { MapPartitionFunction function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); ArrayList result = new ArrayList(inputData.size() / 4); - ListCollector resultCollector = new ListCollector(result); - function.mapPartition(inputData, resultCollector); + if (mutableObjectSafeMode) { + TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(); + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + + CopyingIterator source = new CopyingIterator(inputData.iterator(), inSerializer); + CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer); + + function.mapPartition(source, resultCollector); + } else { + ListCollector resultCollector = new ListCollector(result); + function.mapPartition(inputData, resultCollector); + } + result.trimToSize(); - FunctionUtils.closeFunction(function); return result; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java index 1f17db0..af8a111 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java @@ -18,6 +18,9 @@ package org.apache.flink.api.common.operators.base; +import java.util.List; + +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.NoOpFunction; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -51,4 +54,8 @@ public class PartitionOperatorBase extends SingleInputOperator executeOnCollections(List inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { + return inputData; + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java index f7975a1..1f192f8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.InvalidProgramException; @@ -32,13 +31,14 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeinfo.CompositeType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - /** * Base data flow operator for Reduce user-defined functions. Accepts reduce functions * and key positions. The key positions are expected in the flattened common data model. @@ -123,20 +123,23 @@ public class ReduceOperatorBase> extends SingleI super(new UserCodeClassWrapper(udf), operatorInfo, name); } -// -------------------------------------------------------------------------------------------- + // -------------------------------------------------------------------------------------------- @SuppressWarnings("unchecked") @Override - protected List executeOnCollections(List inputData, RuntimeContext ctx) - throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + // make sure we can handle empty inputs + if (inputData.isEmpty()) { + return Collections.emptyList(); + } + ReduceFunction function = this.userFunction.getUserCodeObject(); UnaryOperatorInformation operatorInfo = getOperatorInfo(); TypeInformation inputType = operatorInfo.getInputType(); if (!(inputType instanceof CompositeType)) { - throw new InvalidProgramException("Input type of groupReduce operation must be" + - " composite type."); + throw new InvalidProgramException("Input type of groupReduce operation must be" + " composite type."); } FunctionUtils.setFunctionRuntimeContext(function, ctx); @@ -161,22 +164,30 @@ public class ReduceOperatorBase> extends SingleI aggregateMap.put(wrapper, result); } - List result = new ArrayList(aggregateMap.values().size()); - result.addAll(aggregateMap.values()); - FunctionUtils.closeFunction(function); - return result; - } else { + return new ArrayList(aggregateMap.values()); + } + else { T aggregate = inputData.get(0); - for (int i = 1; i < inputData.size(); i++) { - aggregate = function.reduce(aggregate, inputData.get(i)); + + if (mutableObjectSafeMode) { + TypeSerializer serializer = getOperatorInfo().getInputType().createSerializer(); + aggregate = serializer.copy(aggregate); + + for (int i = 1; i < inputData.size(); i++) { + T next = function.reduce(aggregate, serializer.copy(inputData.get(i))); + aggregate = serializer.copy(next); + } + } + else { + for (int i = 1; i < inputData.size(); i++) { + aggregate = function.reduce(aggregate, inputData.get(i)); + } } - List result = new ArrayList(1); - result.add(aggregate); FunctionUtils.setFunctionRuntimeContext(function, ctx); - return result; + + return Collections.singletonList(aggregate); } - } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java index 8a4c919..e530f8a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.util; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.io.IOException; import java.util.Iterator; @@ -32,14 +33,16 @@ public final class ListKeyGroupedIterator { private final List input; + private final TypeSerializer serializer; // != null if the elements should be copied + private final TypeComparator comparator; private ValuesIterator valuesIterator; private int currentPosition = 0; - private E lookahead = null; - + private E lookahead; + private boolean done; /** @@ -48,13 +51,13 @@ public final class ListKeyGroupedIterator { * @param input The list with the input elements. * @param comparator The comparator for the data type iterated over. */ - public ListKeyGroupedIterator(List input, TypeComparator comparator) - { + public ListKeyGroupedIterator(List input, TypeSerializer serializer, TypeComparator comparator, boolean copy) { if (input == null || comparator == null) { throw new NullPointerException(); } this.input = input; + this.serializer = copy ? serializer : null; this.comparator = comparator; this.done = input.isEmpty() ? true : false; @@ -109,7 +112,7 @@ public final class ListKeyGroupedIterator { E first = input.get(currentPosition++); if (first != null) { this.comparator.setReference(first); - this.valuesIterator = new ValuesIterator(first); + this.valuesIterator = new ValuesIterator(first, serializer); return true; } else { @@ -155,9 +158,12 @@ public final class ListKeyGroupedIterator { public final class ValuesIterator implements Iterator, Iterable { private E next; + + private final TypeSerializer serializer; - private ValuesIterator(E first) { + private ValuesIterator(E first, TypeSerializer serializer) { this.next = first; + this.serializer = serializer; } @Override @@ -170,7 +176,7 @@ public final class ListKeyGroupedIterator { if (this.next != null) { E current = this.next; this.next = ListKeyGroupedIterator.this.advanceToNext(); - return current; + return serializer != null ? serializer.copy(current) : current; } else { throw new NoSuchElementException(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 8227c22..74dc889 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -30,8 +30,8 @@ import org.junit.Assert; import org.junit.Test; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; @SuppressWarnings("serial") @@ -41,25 +41,34 @@ public class FlatMapOperatorCollectionTest implements Serializable { public void testExecuteOnCollection() { try { IdRichFlatMap udf = new IdRichFlatMap(); - testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k")); + testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k"), true); Assert.assertTrue(udf.isClosed); - testExecuteOnCollection(new IdRichFlatMap(), new ArrayList()); - } catch (Throwable t) { - Assert.fail(t.getMessage()); + udf = new IdRichFlatMap(); + testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k"), false); + Assert.assertTrue(udf.isClosed); + + udf = new IdRichFlatMap(); + testExecuteOnCollection(udf, Collections.emptyList(), true); + Assert.assertTrue(udf.isClosed); + + udf = new IdRichFlatMap(); + testExecuteOnCollection(udf, Collections.emptyList(), false); + Assert.assertTrue(udf.isClosed); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); } } - private void testExecuteOnCollection(FlatMapFunction udf, List input) throws Exception { + private void testExecuteOnCollection(FlatMapFunction udf, List input, boolean mutableSafe) throws Exception { // run on collections final List result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0)); + .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0), mutableSafe); Assert.assertEquals(input.size(), result.size()); - - for (int i = 0; i < input.size(); i++) { - Assert.assertEquals(input.get(i), result.get(i)); - } + Assert.assertEquals(input, result); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index b048cc5..8834989 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -60,10 +60,13 @@ public class JoinOperatorBaseTest implements Serializable { List expected = new ArrayList(Arrays.asList(3, 3, 6 ,6)); try { - List result = base.executeOnCollections(inputData1, inputData2, null); + List resultSafe = base.executeOnCollections(inputData1, inputData2, null, true); + List resultRegular = base.executeOnCollections(inputData1, inputData2, null, false); - assertEquals(expected, result); - } catch (Exception e) { + assertEquals(expected, resultSafe); + assertEquals(expected, resultRegular); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -107,11 +110,13 @@ public class JoinOperatorBaseTest implements Serializable { try { - List result = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, - 1, 0)); + List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), true); + List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), false); - assertEquals(expected, result); - } catch (Exception e) { + assertEquals(expected, resultSafe); + assertEquals(expected, resultRegular); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index bb263ad..82778c5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -52,9 +52,11 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), "TestMapper"); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List result = op.executeOnCollections(input, null); + List resultMutableSafe = op.executeOnCollections(input, null, true); + List resultRegular = op.executeOnCollections(input, null, false); - assertEquals(asList(1, 2, 3, 4, 5, 6), result); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); } catch (Exception e) { e.printStackTrace(); @@ -95,9 +97,11 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List result = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0)); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); - assertEquals(asList(1, 2, 3, 4, 5, 6), result); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); assertTrue(opened.get()); assertTrue(closed.get()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 0657ac1..1c17fde 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -74,9 +74,12 @@ public class PartitionMapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List result = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0)); - assertEquals(asList(1, 2, 3, 4, 5, 6), result); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + + assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); assertTrue(opened.get()); assertTrue(closed.get()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java index 7c89f13..f836692 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java index 0f6acbe..34f9137 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,7 +20,6 @@ package org.apache.flink.api.common.operators; //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator import static org.junit.Assert.*; -//CHECKSTYLE.ON: AvoidStarImport import java.util.ArrayList; import java.util.List; @@ -102,11 +101,14 @@ public class CollectionExecutionIterationTest implements java.io.Serializable { try { ExecutionEnvironment env = new CollectionEnvironment(); + @SuppressWarnings("unchecked") DataSet> solInput = env.fromElements( new Tuple2(1, 0), new Tuple2(2, 0), new Tuple2(3, 0), new Tuple2(4, 0)); + + @SuppressWarnings("unchecked") DataSet> workInput = env.fromElements( new Tuple1(1), new Tuple1(2), http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java index adf2d96..c2db7c9 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index fe2551d..053b8e2 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +@SuppressWarnings("serial") public class CoGroupOperatorCollectionTest implements Serializable { @Test @@ -68,12 +69,17 @@ public class CoGroupOperatorCollectionTest implements Serializable { final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0); { - SumCoGroup udf = new SumCoGroup(); - List> result = getCoGroupOperator(udf) - .executeOnCollections(input1, input2, ctx); - - Assert.assertTrue(udf.isClosed); - + SumCoGroup udf1 = new SumCoGroup(); + SumCoGroup udf2 = new SumCoGroup(); + + List> resultSafe = getCoGroupOperator(udf1) + .executeOnCollections(input1, input2, ctx, true); + List> resultRegular = getCoGroupOperator(udf2) + .executeOnCollections(input1, input2, ctx, false); + + Assert.assertTrue(udf1.isClosed); + Assert.assertTrue(udf2.isClosed); + Set> expected = new HashSet>( Arrays.asList(new Tuple2Builder() .add("foo", 8) @@ -84,14 +90,21 @@ public class CoGroupOperatorCollectionTest implements Serializable { ) ); - Assert.assertEquals(expected, new HashSet(result)); + Assert.assertEquals(expected, new HashSet>(resultSafe)); + Assert.assertEquals(expected, new HashSet>(resultRegular)); } { - List> result = getCoGroupOperator(new SumCoGroup()) - .executeOnCollections(Collections.EMPTY_LIST, Collections.EMPTY_LIST, ctx); - - Assert.assertEquals(0, result.size()); + List> resultSafe = getCoGroupOperator(new SumCoGroup()) + .executeOnCollections(Collections.>emptyList(), + Collections.>emptyList(), ctx, true); + + List> resultRegular = getCoGroupOperator(new SumCoGroup()) + .executeOnCollections(Collections.>emptyList(), + Collections.>emptyList(), ctx, false); + + Assert.assertEquals(0, resultSafe.size()); + Assert.assertEquals(0, resultRegular.size()); } } catch (Throwable t) { t.printStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index f77b292..5d1ca17 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -79,15 +79,21 @@ public class GroupReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> result = op.executeOnCollections(input, null); - Set> resultSet = new HashSet>(result); + + List> resultMutableSafe = op.executeOnCollections(input, null, true); + List> resultRegular = op.executeOnCollections(input, null, false); + + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); + Set> resultSetRegular = new HashSet>(resultRegular); Set> expectedResult = new HashSet>(asList(new Tuple2("foo", 4), new Tuple2("bar", 6))); - assertEquals(expectedResult, resultSet); - } catch (Exception e) { + assertEquals(expectedResult, resultSetMutableSafe); + assertEquals(expectedResult, resultSetRegular); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -149,15 +155,19 @@ public class GroupReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> result = op.executeOnCollections(input, - new RuntimeUDFContext(taskName, 1, 0)); - Set> resultSet = new HashSet>(result); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + + + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); + Set> resultSetRegular = new HashSet>(resultRegular); Set> expectedResult = new HashSet>(asList(new Tuple2("foo", 4), new Tuple2("bar", 6))); - assertEquals(expectedResult, resultSet); + assertEquals(expectedResult, resultSetMutableSafe); + assertEquals(expectedResult, resultSetRegular); assertTrue(opened.get()); assertTrue(closed.get()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java new file mode 100644 index 0000000..d9abf14 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.util.RuntimeUDFContext; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@SuppressWarnings({ "unchecked", "serial" }) +public class JoinOperatorBaseTest implements Serializable { + + + @Test + public void testTupleBaseJoiner(){ + final FlatJoinFunction, Tuple2, Tuple2> joiner = + new FlatJoinFunction, Tuple2, Tuple2>() + { + @Override + public void join(Tuple3 first, Tuple2 second, Collector> out) { + Tuple3 fst = (Tuple3)first; + Tuple2 snd = (Tuple2)second; + + assertEquals(fst.f0, snd.f1); + assertEquals(fst.f2, snd.f0); + + out.collect(new Tuple2(fst.f1, snd.f0.toString())); + } + }; + + final TupleTypeInfo> leftTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo + (String.class, Double.class, Integer.class); + final TupleTypeInfo> rightTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, + String.class); + final TupleTypeInfo> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class, + String.class); + + final int[] leftKeys = new int[]{0,2}; + final int[] rightKeys = new int[]{1,0}; + + final String taskName = "Collection based tuple joiner"; + + final BinaryOperatorInformation, Tuple2, Tuple2> binaryOpInfo = new BinaryOperatorInformation, Tuple2, Tuple2>(leftTypeInfo, rightTypeInfo, outTypeInfo); + + final JoinOperatorBase, Tuple2, Tuple2, FlatJoinFunction, Tuple2, Tuple2>> base = new JoinOperatorBase, + Tuple2, Tuple2, FlatJoinFunction, + Tuple2, Tuple2>>(joiner, binaryOpInfo, leftKeys, rightKeys, taskName); + + final List > inputData1 = new ArrayList>(Arrays.asList( + new Tuple3("foo", 42.0, 1), + new Tuple3("bar", 1.0, 2), + new Tuple3("bar", 2.0, 3), + new Tuple3("foobar", 3.0, 4), + new Tuple3("bar", 3.0, 3) + )); + + final List> inputData2 = new ArrayList>(Arrays.asList( + new Tuple2(3, "bar"), + new Tuple2(4, "foobar"), + new Tuple2(2, "foo") + )); + final Set> expected = new HashSet>(Arrays.asList( + new Tuple2(2.0, "3"), + new Tuple2(3.0, "3"), + new Tuple2(3.0, "4") + )); + + try { + List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), true); + List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), false); + + assertEquals(expected, new HashSet>(resultSafe)); + assertEquals(expected, new HashSet>(resultRegular)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index a4de40b..2baf57e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -68,15 +68,20 @@ public class ReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> result = op.executeOnCollections(input, null); - Set> resultSet = new HashSet>(result); + List> resultMutableSafe = op.executeOnCollections(input, null, true); + List> resultRegular = op.executeOnCollections(input, null, false); + + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); + Set> resultSetRegular = new HashSet>(resultRegular); Set> expectedResult = new HashSet>(asList(new Tuple2("foo", 4), new Tuple2("bar", 6))); - assertEquals(expectedResult, resultSet); - } catch (Exception e) { + assertEquals(expectedResult, resultSetMutableSafe); + assertEquals(expectedResult, resultSetRegular); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -127,20 +132,23 @@ public class ReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> result = op.executeOnCollections(input, - new RuntimeUDFContext(taskName, 1, 0)); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); - Set> resultSet = new HashSet>(result); + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); + Set> resultSetRegular = new HashSet>(resultRegular); Set> expectedResult = new HashSet>(asList(new Tuple2("foo", 4), new Tuple2("bar", 6))); - assertEquals(expectedResult, resultSet); + assertEquals(expectedResult, resultSetMutableSafe); + assertEquals(expectedResult, resultSetRegular); assertTrue(opened.get()); assertTrue(closed.get()); - } catch (Exception e) { + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java deleted file mode 100644 index f332832..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.operators.base; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.operators.BinaryOperatorInformation; -import org.apache.flink.api.common.operators.base.JoinOperatorBase; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.util.Collector; -import org.junit.Test; - -import java.io.Serializable; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -public class JoinOperatorBaseTest implements Serializable { - - @Test - public void testTupleBaseJoiner(){ - final FlatJoinFunction, Tuple2, Tuple2> joiner = new FlatJoinFunction() { - @Override - public void join(Object first, Object second, Collector out) throws Exception { - Tuple3 fst = (Tuple3)first; - Tuple2 snd = (Tuple2)second; - - assertEquals(fst.f0, snd.f1); - assertEquals(fst.f2, snd.f0); - - out.collect(new Tuple2(fst.f1, snd.f0.toString())); - } - }; - - final TupleTypeInfo> leftTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo - (String.class, Double.class, Integer.class); - final TupleTypeInfo> rightTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, - String.class); - final TupleTypeInfo> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class, - String.class); - - final int[] leftKeys = new int[]{0,2}; - final int[] rightKeys = new int[]{1,0}; - - final String taskName = "Collection based tuple joiner"; - - final BinaryOperatorInformation, Tuple2, Tuple2> binaryOpInfo = new BinaryOperatorInformation, Tuple2, Tuple2>(leftTypeInfo, rightTypeInfo, outTypeInfo); - - final JoinOperatorBase, Tuple2, Tuple2, FlatJoinFunction, Tuple2, Tuple2>> base = new JoinOperatorBase, - Tuple2, Tuple2, FlatJoinFunction, - Tuple2, Tuple2>>(joiner, binaryOpInfo, leftKeys, rightKeys, taskName); - - final List > inputData1 = new ArrayList>(Arrays.asList( - new Tuple3("foo", 42.0, 1), - new Tuple3("bar", 1.0, 2), - new Tuple3("bar", 2.0, 3), - new Tuple3("foobar", 3.0, 4), - new Tuple3("bar", 3.0, 3) - )); - - final List> inputData2 = new ArrayList>(Arrays.asList( - new Tuple2(3, "bar"), - new Tuple2(4, "foobar"), - new Tuple2(2, "foo") - )); - final Set> expected = new HashSet>(Arrays.asList( - new Tuple2(2.0, "3"), - new Tuple2(3.0, "3"), - new Tuple2(3.0, "4") - )); - - try { - Method executeOnCollections = base.getClass().getDeclaredMethod("executeOnCollections", List.class, - List.class, RuntimeContext.class); - executeOnCollections.setAccessible(true); - - Object result = executeOnCollections.invoke(base, inputData1, inputData2, null); - - assertEquals(expected, new HashSet>((List>)result)); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - } -}