flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [Refactor] [DataSet] Refactor key selector translation in DataSet API. Clean up several compiler warnings.
Date Tue, 19 Jan 2016 10:57:43 GMT
Repository: flink
Updated Branches:
  refs/heads/master 153a67881 -> 544abb937


http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 1193da5..6791741 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -18,20 +18,16 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.DataSet;
 
 /**
@@ -89,9 +85,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN,
IN, ReduceOpe
 		// offset semantic information by extracted key fields
 		if(props != null &&
 				this.grouper != null &&
-				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+				this.grouper.keys instanceof SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
 				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
@@ -109,9 +105,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN,
IN, ReduceOpe
 		// distinguish between grouped reduce and non-grouped reduce
 		if (grouper == null) {
 			// non grouped reduce
-			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN,
IN>(getInputType(), getInputType());
+			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(),
getInputType());
 			ReduceOperatorBase<IN, ReduceFunction<IN>> po =
-					new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, new
int[0], name);
+					new ReduceOperatorBase<>(function, operatorInfo, new int[0], name);
 			
 			po.setInput(input);
 			// the parallelism for a non grouped reduce can only be 1
@@ -120,13 +116,14 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN,
IN, ReduceOpe
 			return po;
 		}
 		
-		if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
+		if (grouper.getKeys() instanceof SelectorFunctionKeys) {
 			
 			// reduce with key selector function
 			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN,
?>) grouper.getKeys();
-			
-			MapOperatorBase<?, IN, ?> po = translateSelectorFunctionReducer(selectorKeys, function,
getInputType(), name, input, getParallelism());
+			SelectorFunctionKeys<IN, ?> selectorKeys = (SelectorFunctionKeys<IN, ?>) grouper.getKeys();
+
+			org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> po =
+				translateSelectorFunctionReducer(selectorKeys, function, getInputType(), name, input,
getParallelism());
 			((PlanUnwrappingReduceOperator<?, ?>) po.getInput()).setCustomPartitioner(grouper.getCustomPartitioner());
 			
 			return po;
@@ -135,9 +132,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN,
IN, ReduceOpe
 			
 			// reduce with field positions
 			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
-			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN,
IN>(getInputType(), getInputType());
+			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(),
getInputType());
 			ReduceOperatorBase<IN, ReduceFunction<IN>> po =
-					new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, logicalKeyPositions,
name);
+					new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
 			
 			po.setCustomPartitioner(grouper.getCustomPartitioner());
 			
@@ -153,30 +150,24 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN,
IN, ReduceOpe
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T,
?> rawKeys,
-			ReduceFunction<T> function, TypeInformation<T> inputType, String name, Operator<T>
input, int parallelism)
+	private static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?,
T, ?> translateSelectorFunctionReducer(
+		SelectorFunctionKeys<T, ?> rawKeys,
+		ReduceFunction<T> function,
+		TypeInformation<T> inputType,
+		String name,
+		Operator<T> input,
+		int parallelism)
 	{
 		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>)
rawKeys;
-		
-		TypeInformation<Tuple2<K, T>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K,
T>>(keys.getKeyType(), inputType);
-		
-		KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper<T, K>(keys.getKeyExtractor());
+		final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys;
 		
-		PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<T,
K>(function, keys, name, inputType, typeInfoWithKey);
+		TypeInformation<Tuple2<K, T>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
+		Operator<Tuple2<K, T>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input,
keys);
 		
-		MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>
keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K,
T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType,
typeInfoWithKey), "Key Extractor");
-		MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>
keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K,
T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K,
T>, T>(typeInfoWithKey, inputType), "Key Extractor");
-
-		keyExtractingMap.setInput(input);
-		reducer.setInput(keyExtractingMap);
-		keyRemovingMap.setInput(reducer);
-		
-		// set parallelism
-		keyExtractingMap.setParallelism(input.getParallelism());
+		PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<>(function,
keys, name, inputType, typeInfoWithKey);
+		reducer.setInput(keyedInput);
 		reducer.setParallelism(parallelism);
-		keyRemovingMap.setParallelism(parallelism);
-		
-		return keyRemovingMap;
+
+		return SelectorFunctionKeys.appendKeyRemover(reducer, keys);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index b488dd1..6092d14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.Utils;
@@ -124,6 +125,16 @@ public class SortedGrouping<T> extends Grouping<T> {
 	protected Order[] getGroupSortOrders() {
 		return this.groupSortOrders;
 	}
+
+	protected Ordering getGroupOrdering() {
+
+		Ordering o = new Ordering();
+		for(int i=0; i < this.groupSortKeyPositions.length; i++) {
+			o.appendOrdering(this.groupSortKeyPositions[i], null, this.groupSortOrders[i]);
+		}
+
+		return o;
+	}
 	
 	/**
 	 * Uses a custom partitioner for the grouping.


Mime
View raw message