flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [03/23] git commit: [FLINK-1110] Implement collection-based execution for mapPartition.
Date Fri, 03 Oct 2014 16:24:59 GMT
[FLINK-1110] Implement collection-based execution for mapPartition.

Make groupReduce code compliant with pre-java-8 versions, fix java8 tests with moved type
information classes.

Fix Various warnings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3eac6f23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3eac6f23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3eac6f23

Branch: refs/heads/master
Commit: 3eac6f239cc0c6fe0e7fae6718215b626446df44
Parents: fd3f5c2
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Sep 22 23:23:17 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Oct 3 16:22:33 2014 +0200

----------------------------------------------------------------------
 .../base/CollectorMapOperatorBase.java          | 11 ++-
 .../operators/base/GroupReduceOperatorBase.java |  6 +-
 .../base/MapPartitionOperatorBase.java          | 25 ++++++
 .../FlatMapOperatorCollectionExecutionTest.java |  2 +
 .../base/PartitionMapOperatorTest.java          | 89 ++++++++++++++++++++
 .../CollectionExecutionAccumulatorsTest.java    | 75 +++++++++++++++++
 .../operators/base/GroupReduceOperatorTest.java |  2 -
 .../api/java/operator/MaxByOperatorTest.java    |  3 +-
 .../api/java/operator/MinByOperatorTest.java    |  2 +-
 .../javaApiOperators/lambdas/ReduceITCase.java  |  2 +-
 10 files changed, 208 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index eb246be..c3e00bc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import java.util.List;
+
 import org.apache.flink.api.common.functions.GenericCollectorMap;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
-
 /**
  * The CollectorMap is the old version of the Map operator. It is effectively a "flatMap",
where the
  * UDF is called "map".
@@ -46,4 +48,11 @@ public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN
 	public CollectorMapOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN,
OUT> operatorInfo, String name) {
 		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext
ctx) {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index b309706..6abd7b5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
@@ -144,6 +145,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 
 		int[] inputColumns = getKeyColumns(0);
 		boolean[] inputOrderings = new boolean[inputColumns.length];
+		@SuppressWarnings("unchecked")
 		final TypeComparator<IN> inputComparator =
 				((CompositeType<IN>) inputType).createComparator(inputColumns, inputOrderings);
 
@@ -154,10 +156,10 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size());
 		ListCollector<OUT> collector = new ListCollector<OUT>(result);
 
-		inputData.sort( new Comparator<IN>() {
+		Collections.sort(inputData, new Comparator<IN>() {
 			@Override
 			public int compare(IN o1, IN o2) {
-				return - inputComparator.compare(o1, o2);
+				return inputComparator.compare(o2, o1);
 			}
 		});
 		ListKeyGroupedIterator<IN> keyedIterator =

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
index e2c3a08..21fa9be 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
@@ -18,7 +18,13 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -44,4 +50,23 @@ public class MapPartitionOperatorBase<IN, OUT, FT extends MapPartitionFunction<I
 	public MapPartitionOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN,
OUT> operatorInfo, String name) {
 		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
 	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext
ctx) throws Exception {
+		MapPartitionFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
+		
+		FunctionUtils.setFunctionRuntimeContext(function, ctx);
+		FunctionUtils.openFunction(function, this.parameters);
+		
+		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size() / 4);
+		ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
+		
+		function.mapPartition(inputData, resultCollector);
+		result.trimToSize();
+		
+		FunctionUtils.closeFunction(function);
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java
index 72b555a..ba51ed8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+@SuppressWarnings("serial")
 public class FlatMapOperatorCollectionExecutionTest implements Serializable {
 
 	@Test
@@ -61,6 +62,7 @@ public class FlatMapOperatorCollectionExecutionTest implements Serializable
{
 		}
 	}
 
+
 	public class IdRichFlatMap<IN> extends RichFlatMapFunction<IN, IN> {
 
 		private boolean isOpened = false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
new file mode 100644
index 0000000..0657ac1
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import static org.junit.Assert.*;
+import static java.util.Arrays.asList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionMapOperatorTest implements java.io.Serializable {
+
+	@Test
+	public void testMapPartitionWithRuntimeContext() {
+		try {
+			final String taskName = "Test Task";
+			final AtomicBoolean opened = new AtomicBoolean();
+			final AtomicBoolean closed = new AtomicBoolean();
+			
+			final MapPartitionFunction<String, Integer> parser = new RichMapPartitionFunction<String,
Integer>() {
+				
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					opened.set(true);
+					RuntimeContext ctx = getRuntimeContext();
+					assertEquals(0, ctx.getIndexOfThisSubtask());
+					assertEquals(1, ctx.getNumberOfParallelSubtasks());
+					assertEquals(taskName, ctx.getTaskName());
+				}
+				
+				@Override
+				public void mapPartition(Iterable<String> values, Collector<Integer> out)
{
+					for (String s : values) {
+						out.collect(Integer.parseInt(s));
+					}
+				}
+				
+				@Override
+				public void close() throws Exception {
+					closed.set(true);
+				}
+			};
+			
+			MapPartitionOperatorBase<String, Integer, MapPartitionFunction<String, Integer>>
op = 
+					new MapPartitionOperatorBase<String, Integer, MapPartitionFunction<String,Integer>>(
+					parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO), taskName);
+			
+			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5",
"6"));
+			List<Integer> result = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0));
+			
+			assertEquals(asList(1, 2, 3, 4, 5, 6), result);
+			
+			assertTrue(opened.get());
+			assertTrue(closed.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
new file mode 100644
index 0000000..7c89f13
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+public class CollectionExecutionAccumulatorsTest {
+
+	private static final String ACCUMULATOR_NAME = "TEST ACC";
+	
+	@Test
+	public void testAccumulator() {
+		try {
+			final int NUM_ELEMENTS = 100;
+			
+			ExecutionEnvironment env = new CollectionEnvironment();
+			
+			env.generateSequence(1, NUM_ELEMENTS)
+				.map(new CountingMapper())
+				.output(new DiscardingOuputFormat<Long>());
+			
+			JobExecutionResult result = env.execute();
+			
+			assertTrue(result.getNetRuntime() >= 0);
+			
+			assertEquals(NUM_ELEMENTS, result.getAccumulatorResult(ACCUMULATOR_NAME));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	public static class CountingMapper extends RichMapFunction<Long, Long> {
+		
+		private IntCounter accumulator;
+		
+		@Override
+		public void open(Configuration parameters) {
+			accumulator = getRuntimeContext().getIntCounter(ACCUMULATOR_NAME);
+		}
+		
+		@Override
+		public Long map(Long value) {
+			accumulator.add(1);
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index 6c655ce..f77b292 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
index 76470b6..2af8a8c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
@@ -21,8 +21,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-import junit.framework.Assert;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.DataSet;
@@ -30,6 +28,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class MaxByOperatorTest {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
index 72e91b6..5d9c938 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3eac6f23/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
index 52c215f..9fdb837 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -69,7 +69,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 				BasicTypeInfo.LONG_TYPE_INFO,
 				BasicTypeInfo.INT_TYPE_INFO,
 				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO
+				BasicTypeInfo<T>.LONG_TYPE_INFO
 		);
 
 		return env.fromCollection(data, type);


Mime
View raw message