flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [12/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:12 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 0c0b710..c02d365 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
@@ -42,9 +43,9 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase;
 import org.apache.flink.api.java.operators.join.JoinType;
 import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
@@ -394,8 +395,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 				@SuppressWarnings("unchecked")
 				SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>)rawKeys1;
-				TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1);
-				Operator<Tuple2<K, I1>> keyMapper1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
+				TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
+				Operator<Tuple2<K, I1>> keyMapper1 = KeyFunctions.appendKeyExtractor(input1, keys1);
 
 				return this.withInput1(keyMapper1, typeInfoWithKey1, rawKeys1);
 			}
@@ -406,8 +407,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 				@SuppressWarnings("unchecked")
 				SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>)rawKeys2;
-				TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2);
-				Operator<Tuple2<K, I2>> keyMapper2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
+				TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2);
+				Operator<Tuple2<K, I2>> keyMapper2 = KeyFunctions.appendKeyExtractor(input2, keys2);
 
 				return withInput2(keyMapper2, typeInfoWithKey2, rawKeys2);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
new file mode 100644
index 0000000..49d598a
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+/**
+ * This class holds static utilities to append functions that extract and
+ * prune keys.
+ */
+public class KeyFunctions {
+
+	@SuppressWarnings("unchecked")
+	public static <T, K> org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> appendKeyExtractor(
+			org.apache.flink.api.common.operators.Operator<T> input,
+			SelectorFunctionKeys<T, K> key)
+	{
+
+		TypeInformation<T> inputType = key.getInputType();
+		TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
+		KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor());
+
+		MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> mapper =
+				new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(
+						extractor,
+						new UnaryOperatorInformation(inputType, typeInfoWithKey),
+						"Key Extractor"
+				);
+
+		mapper.setInput(input);
+		mapper.setParallelism(input.getParallelism());
+
+		return mapper;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <T, K1, K2> org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> appendKeyExtractor(
+			org.apache.flink.api.common.operators.Operator<T> input,
+			SelectorFunctionKeys<T, K1> key1,
+			SelectorFunctionKeys<T, K2> key2)
+	{
+
+		TypeInformation<T> inputType = key1.getInputType();
+		TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
+		TwoKeyExtractingMapper<T, K1, K2> extractor =
+				new TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor());
+
+		MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>> mapper =
+				new MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>>(
+						extractor,
+						new UnaryOperatorInformation<>(inputType, typeInfoWithKey),
+						"Key Extractor"
+				);
+
+		mapper.setInput(input);
+		mapper.setParallelism(input.getParallelism());
+
+		return mapper;
+	}
+
+	public static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> appendKeyRemover(
+			org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> inputWithKey,
+			SelectorFunctionKeys<T, K> key)
+	{
+
+		TypeInformation<T> inputType = key.getInputType();
+		TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
+
+		MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> mapper =
+				new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(
+						new KeyRemovingMapper<T, K>(),
+						new UnaryOperatorInformation<>(typeInfoWithKey, inputType),
+						"Key Remover"
+				);
+		mapper.setInput(inputWithKey);
+		mapper.setParallelism(inputWithKey.getParallelism());
+
+		return mapper;
+	}
+
+	public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey(
+			SelectorFunctionKeys<T, K> key)
+	{
+		return new TupleTypeInfo<>(key.getKeyType(), key.getInputType());
+	}
+
+	public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> createTypeWithKey(
+			SelectorFunctionKeys<T, K1> key1,
+			SelectorFunctionKeys<T, K2> key2)
+	{
+		return new TupleTypeInfo<>(key1.getKeyType(), key2.getKeyType(), key1.getInputType());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
deleted file mode 100644
index 5992f0b..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ /dev/null
@@ -1,550 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
-import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import com.google.common.base.Preconditions;
-
-public abstract class Keys<T> {
-
-	public abstract int getNumberOfKeyFields();
-
-	public abstract int[] computeLogicalKeyPositions();
-
-	protected abstract TypeInformation<?>[] getKeyFieldTypes();
-
-	public abstract <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo);
-
-	public boolean isEmpty() {
-		return getNumberOfKeyFields() == 0;
-	}
-
-	/**
-	 * Check if two sets of keys are compatible to each other (matching types, key counts)
-	 */
-	public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException {
-
-		TypeInformation<?>[] thisKeyFieldTypes = this.getKeyFieldTypes();
-		TypeInformation<?>[] otherKeyFieldTypes = other.getKeyFieldTypes();
-
-		if (thisKeyFieldTypes.length != otherKeyFieldTypes.length) {
-			throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
-		} else {
-			for (int i = 0; i < thisKeyFieldTypes.length; i++) {
-				if (!thisKeyFieldTypes[i].equals(otherKeyFieldTypes[i])) {
-					throw new IncompatibleKeysException(thisKeyFieldTypes[i], otherKeyFieldTypes[i] );
-				}
-			}
-		}
-		return true;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Specializations for expression-based / extractor-based grouping
-	// --------------------------------------------------------------------------------------------
-	
-	
-	public static class SelectorFunctionKeys<T, K> extends Keys<T> {
-
-		private final KeySelector<T, K> keyExtractor;
-		private final TypeInformation<T> inputType;
-		private final TypeInformation<K> keyType;
-		private final List<FlatFieldDescriptor> keyFields;
-
-		public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> inputType, TypeInformation<K> keyType) {
-
-			if (keyExtractor == null) {
-				throw new NullPointerException("Key extractor must not be null.");
-			}
-			if (keyType == null) {
-				throw new NullPointerException("Key type must not be null.");
-			}
-			if (!keyType.isKeyType()) {
-				throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key type");
-			}
-
-			this.keyExtractor = keyExtractor;
-			this.inputType = inputType;
-			this.keyType = keyType;
-
-			if (keyType instanceof CompositeType) {
-				this.keyFields = ((CompositeType<T>)keyType).getFlatFields(ExpressionKeys.SELECT_ALL_CHAR);
-			}
-			else {
-				this.keyFields = new ArrayList<>(1);
-				this.keyFields.add(new FlatFieldDescriptor(0, keyType));
-			}
-		}
-
-		public TypeInformation<K> getKeyType() {
-			return keyType;
-		}
-
-		public TypeInformation<T> getInputType() {
-			return inputType;
-		}
-
-		public KeySelector<T, K> getKeyExtractor() {
-			return keyExtractor;
-		}
-
-		@Override
-		public int getNumberOfKeyFields() {
-			return keyFields.size();
-		}
-
-		@Override
-		public int[] computeLogicalKeyPositions() {
-			int[] logicalKeys = new int[keyFields.size()];
-			for (int i = 0; i < keyFields.size(); i++) {
-				logicalKeys[i] = keyFields.get(i).getPosition();
-			}
-			return logicalKeys;
-		}
-
-		@Override
-		protected TypeInformation<?>[] getKeyFieldTypes() {
-			TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()];
-			for (int i = 0; i < keyFields.size(); i++) {
-				fieldTypes[i] = keyFields.get(i).getType();
-			}
-			return fieldTypes;
-		}
-		
-		@Override
-		public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) {
-
-			if (keyFields.size() != 1) {
-				throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field.");
-			}
-			
-			if (typeInfo == null) {
-				// try to extract key type from partitioner
-				try {
-					typeInfo = TypeExtractor.getPartitionerTypes(partitioner);
-				}
-				catch (Throwable t) {
-					// best effort check, so we ignore exceptions
-				}
-			}
-
-			// only check if type is known and not a generic type
-			if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) {
-				// check equality of key and partitioner type
-				if (!keyType.equals(typeInfo)) {
-					throw new InvalidProgramException("The partitioner is incompatible with the key type. "
-						+ "Partitioner type: " + typeInfo + " , key type: " + keyType);
-				}
-			}
-		}
-
-		@SuppressWarnings("unchecked")
-		public static <T, K> Operator<Tuple2<K, T>> appendKeyExtractor(
-			Operator<T> input,
-			SelectorFunctionKeys<T, K> key)
-		{
-
-			TypeInformation<T> inputType = key.getInputType();
-			TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
-			KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor());
-
-			MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> mapper =
-				new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(
-					extractor,
-					new UnaryOperatorInformation(inputType, typeInfoWithKey),
-					"Key Extractor"
-				);
-
-			mapper.setInput(input);
-			mapper.setParallelism(input.getParallelism());
-
-			return mapper;
-		}
-
-		@SuppressWarnings("unchecked")
-		public static <T, K1, K2> Operator<Tuple3<K1, K2, T>> appendKeyExtractor(
-			Operator<T> input,
-			SelectorFunctionKeys<T, K1> key1,
-			SelectorFunctionKeys<T, K2> key2)
-		{
-
-			TypeInformation<T> inputType = key1.getInputType();
-			TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
-			TwoKeyExtractingMapper<T, K1, K2> extractor =
-				new TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor());
-
-			MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>> mapper =
-				new MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>>(
-					extractor,
-					new UnaryOperatorInformation<>(inputType, typeInfoWithKey),
-					"Key Extractor"
-				);
-
-			mapper.setInput(input);
-			mapper.setParallelism(input.getParallelism());
-
-			return mapper;
-		}
-
-		public static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> appendKeyRemover(
-			Operator<Tuple2<K, T>> inputWithKey,
-			SelectorFunctionKeys<T, K> key)
-		{
-
-			TypeInformation<T> inputType = key.getInputType();
-			TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
-
-			MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> mapper =
-				new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(
-					new KeyRemovingMapper<T, K>(),
-					new UnaryOperatorInformation<>(typeInfoWithKey, inputType),
-					"Key Remover"
-				);
-			mapper.setInput(inputWithKey);
-			mapper.setParallelism(inputWithKey.getParallelism());
-
-			return mapper;
-		}
-
-		public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey(
-			SelectorFunctionKeys<T, K> key)
-		{
-			return new TupleTypeInfo<>(key.getKeyType(), key.getInputType());
-		}
-
-		public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> createTypeWithKey(
-			SelectorFunctionKeys<T, K1> key1,
-			SelectorFunctionKeys<T, K2> key2)
-		{
-			return new TupleTypeInfo<>(key1.getKeyType(), key2.getKeyType(), key1.getInputType());
-		}
-
-		@Override
-		public String toString() {
-			return "Key function (Type: " + keyType + ")";
-		}
-	}
-	
-	
-	/**
-	 * Represents (nested) field access through string and integer-based keys
-	 */
-	public static class ExpressionKeys<T> extends Keys<T> {
-		
-		public static final String SELECT_ALL_CHAR = "*";
-		public static final String SELECT_ALL_CHAR_SCALA = "_";
-		
-		// Flattened fields representing keys fields
-		private List<FlatFieldDescriptor> keyFields;
-
-		/**
-		 * ExpressionKeys that is defined by the full data type.
-		 */
-		public ExpressionKeys(TypeInformation<T> type) {
-			this(SELECT_ALL_CHAR, type);
-		}
-
-		/**
-		 * Create int-based (non-nested) field position keys on a tuple type.
-		 */
-		public ExpressionKeys(int keyPosition, TypeInformation<T> type) {
-			this(new int[]{keyPosition}, type, false);
-		}
-
-		/**
-		 * Create int-based (non-nested) field position keys on a tuple type.
-		 */
-		public ExpressionKeys(int[] keyPositions, TypeInformation<T> type) {
-			this(keyPositions, type, false);
-		}
-
-		/**
-		 * Create int-based (non-nested) field position keys on a tuple type.
-		 */
-		public ExpressionKeys(int[] keyPositions, TypeInformation<T> type, boolean allowEmpty) {
-
-			if (!type.isTupleType() || !(type instanceof CompositeType)) {
-				throw new InvalidProgramException("Specifying keys via field positions is only valid " +
-						"for tuple data types. Type: " + type);
-			}
-			if (type.getArity() == 0) {
-				throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity());
-			}
-			if (!allowEmpty && (keyPositions == null || keyPositions.length == 0)) {
-				throw new IllegalArgumentException("The grouping fields must not be empty.");
-			}
-
-			this.keyFields = new ArrayList<>();
-
-			if (keyPositions == null || keyPositions.length == 0) {
-				// use all tuple fields as key fields
-				keyPositions = createIncrIntArray(type.getArity());
-			} else {
-				rangeCheckFields(keyPositions, type.getArity() - 1);
-			}
-			Preconditions.checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point");
-
-			// extract key field types
-			CompositeType<T> cType = (CompositeType<T>)type;
-			this.keyFields = new ArrayList<>(type.getTotalFields());
-
-			// for each key position, find all (nested) field types
-			String[] fieldNames = cType.getFieldNames();
-			ArrayList<FlatFieldDescriptor> tmpList = new ArrayList<>();
-			for (int keyPos : keyPositions) {
-				tmpList.clear();
-				// get all flat fields
-				cType.getFlatFields(fieldNames[keyPos], 0, tmpList);
-				// check if fields are of key type
-				for(FlatFieldDescriptor ffd : tmpList) {
-					if(!ffd.getType().isKeyType()) {
-						throw new InvalidProgramException("This type (" + ffd.getType() + ") cannot be used as key.");
-					}
-				}
-				this.keyFields.addAll(tmpList);
-			}
-		}
-
-		/**
-		 * Create String-based (nested) field expression keys on a composite type.
-		 */
-		public ExpressionKeys(String keyExpression, TypeInformation<T> type) {
-			this(new String[]{keyExpression}, type);
-		}
-
-		/**
-		 * Create String-based (nested) field expression keys on a composite type.
-		 */
-		public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) {
-			Preconditions.checkNotNull(keyExpressions, "Field expression cannot be null.");
-
-			this.keyFields = new ArrayList<>(keyExpressions.length);
-
-			if (type instanceof CompositeType){
-				CompositeType<T> cType = (CompositeType<T>) type;
-
-				// extract the keys on their flat position
-				for (String keyExpr : keyExpressions) {
-					if (keyExpr == null) {
-						throw new InvalidProgramException("Expression key may not be null.");
-					}
-					// strip off whitespace
-					keyExpr = keyExpr.trim();
-
-					List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr);
-
-					if (flatFields.size() == 0) {
-						throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType);
-					}
-					// check if all nested fields can be used as keys
-					for (FlatFieldDescriptor field : flatFields) {
-						if (!field.getType().isKeyType()) {
-							throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key.");
-						}
-					}
-					// add flat fields to key fields
-					keyFields.addAll(flatFields);
-				}
-			}
-			else {
-				if (!type.isKeyType()) {
-					throw new InvalidProgramException("This type (" + type + ") cannot be used as key.");
-				}
-
-				// check that all key expressions are valid
-				for (String keyExpr : keyExpressions) {
-					if (keyExpr == null) {
-						throw new InvalidProgramException("Expression key may not be null.");
-					}
-					// strip off whitespace
-					keyExpr = keyExpr.trim();
-					// check that full type is addressed
-					if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
-						throw new InvalidProgramException(
-							"Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types.");
-					}
-					// add full type as key
-					keyFields.add(new FlatFieldDescriptor(0, type));
-				}
-			}
-		}
-		
-		@Override
-		public int getNumberOfKeyFields() {
-			if(keyFields == null) {
-				return 0;
-			}
-			return keyFields.size();
-		}
-
-		@Override
-		public int[] computeLogicalKeyPositions() {
-			int[] logicalKeys = new int[keyFields.size()];
-			for (int i = 0; i < keyFields.size(); i++) {
-				logicalKeys[i] = keyFields.get(i).getPosition();
-			}
-			return logicalKeys;
-		}
-
-		@Override
-		protected TypeInformation<?>[] getKeyFieldTypes() {
-			TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()];
-			for (int i = 0; i < keyFields.size(); i++) {
-				fieldTypes[i] = keyFields.get(i).getType();
-			}
-			return fieldTypes;
-		}
-
-		@Override
-		public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) {
-
-			if (keyFields.size() != 1) {
-				throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field.");
-			}
-
-			if (typeInfo == null) {
-				// try to extract key type from partitioner
-				try {
-					typeInfo = TypeExtractor.getPartitionerTypes(partitioner);
-				}
-				catch (Throwable t) {
-					// best effort check, so we ignore exceptions
-				}
-			}
-
-			if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) {
-				// only check type compatibility if type is known and not a generic type
-
-				TypeInformation<?> keyType = keyFields.get(0).getType();
-				if (!keyType.equals(typeInfo)) {
-					throw new InvalidProgramException("The partitioner is incompatible with the key type. "
-										+ "Partitioner type: " + typeInfo + " , key type: " + keyType);
-				}
-			}
-		}
-
-		@Override
-		public String toString() {
-			Joiner join = Joiner.on('.');
-			return "ExpressionKeys: " + join.join(keyFields);
-		}
-
-		public static boolean isSortKey(int fieldPos, TypeInformation<?> type) {
-
-			if (!type.isTupleType() || !(type instanceof CompositeType)) {
-				throw new InvalidProgramException("Specifying keys via field positions is only valid " +
-					"for tuple data types. Type: " + type);
-			}
-			if (type.getArity() == 0) {
-				throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity());
-			}
-
-			if(fieldPos < 0 || fieldPos >= type.getArity()) {
-				throw new IndexOutOfBoundsException("Tuple position is out of range: " + fieldPos);
-			}
-
-			TypeInformation<?> sortKeyType = ((CompositeType<?>)type).getTypeAt(fieldPos);
-			return sortKeyType.isSortKeyType();
-		}
-
-		public static boolean isSortKey(String fieldExpr, TypeInformation<?> type) {
-
-			TypeInformation<?> sortKeyType;
-
-			fieldExpr = fieldExpr.trim();
-			if (SELECT_ALL_CHAR.equals(fieldExpr) || SELECT_ALL_CHAR_SCALA.equals(fieldExpr)) {
-				sortKeyType = type;
-			}
-			else {
-				if (type instanceof CompositeType) {
-					sortKeyType = ((CompositeType<?>) type).getTypeAt(fieldExpr);
-				}
-				else {
-					throw new InvalidProgramException(
-						"Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for atomic types.");
-				}
-			}
-
-			return sortKeyType.isSortKeyType();
-		}
-
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-
-
-	// --------------------------------------------------------------------------------------------
-	//  Utilities
-	// --------------------------------------------------------------------------------------------
-
-
-	private static int[] createIncrIntArray(int numKeys) {
-		int[] keyFields = new int[numKeys];
-		for (int i = 0; i < numKeys; i++) {
-			keyFields[i] = i;
-		}
-		return keyFields;
-	}
-
-	private static void rangeCheckFields(int[] fields, int maxAllowedField) {
-
-		for (int f : fields) {
-			if (f < 0 || f > maxAllowedField) {
-				throw new IndexOutOfBoundsException("Tuple position is out of range: " + f);
-			}
-		}
-	}
-
-	public static class IncompatibleKeysException extends Exception {
-		private static final long serialVersionUID = 1L;
-		public static final String SIZE_MISMATCH_MESSAGE = "The number of specified keys is different.";
-		
-		public IncompatibleKeysException(String message) {
-			super(message);
-		}
-
-		public IncompatibleKeysException(TypeInformation<?> typeInformation, TypeInformation<?> typeInformation2) {
-			super(typeInformation+" and "+typeInformation2+" are not compatible");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index 1384ca2..96931b0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -21,13 +21,14 @@ package org.apache.flink.api.java.operators;
 import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
@@ -150,9 +151,9 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 		Partitioner<?> customPartitioner)
 	{
 		final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys;
-		TypeInformation<Tuple2<K, T>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
+		TypeInformation<Tuple2<K, T>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
 
-		Operator<Tuple2<K, T>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
+		Operator<Tuple2<K, T>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
 
 		PartitionOperatorBase<Tuple2<K, T>> keyedPartitionedInput =
 			new PartitionOperatorBase<>(new UnaryOperatorInformation<>(typeInfoWithKey, typeInfoWithKey), pMethod, new int[]{0}, name);
@@ -160,7 +161,7 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 		keyedPartitionedInput.setCustomPartitioner(customPartitioner);
 		keyedPartitionedInput.setParallelism(partitionDop);
 
-		return SelectorFunctionKeys.appendKeyRemover(keyedPartitionedInput, keys);
+		return KeyFunctions.appendKeyRemover(keyedPartitionedInput, keys);
 	}
 
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 6f8877f..a22a262 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -19,13 +19,14 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
@@ -161,13 +162,13 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		@SuppressWarnings("unchecked")
 		final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys;
 		
-		TypeInformation<Tuple2<K, T>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
-		Operator<Tuple2<K, T>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
+		TypeInformation<Tuple2<K, T>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
+		Operator<Tuple2<K, T>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
 		
 		PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<>(function, keys, name, inputType, typeInfoWithKey);
 		reducer.setInput(keyedInput);
 		reducer.setParallelism(parallelism);
 
-		return SelectorFunctionKeys.appendKeyRemover(reducer, keys);
+		return KeyFunctions.appendKeyRemover(reducer, keys);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
index d65bc68..c8a8684 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 07d0b9a..2453f1b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
@@ -31,7 +32,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
index c55d919..6144975 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.CodeAnalysisMode;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.sca.CodeAnalyzerException;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 5b0a368..4e6f6ff 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
index e14e06d..734456c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
 import org.apache.flink.api.java.operators.JoinOperator.EquiJoin;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
index fb74d1e..f620e11 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
index 2c62732..2307c0c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
index 56f34cc..30e28eb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index c8e40ce..95b5840 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 72b79a4..21f15d4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 2aa8d54..e85bb79 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index e52a5c4..46773fa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 278d706..f0e8055 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
index 1f4b44a..93cba33 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
@@ -32,8 +32,8 @@ import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.sca.TaggedValue.Input;
 import org.objectweb.asm.Type;
 import org.objectweb.asm.tree.MethodNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
deleted file mode 100644
index 5f74513..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
- *
- * Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>}
- *     with a {@code GenericType<avro.Utf8>}.
- * All other types used by Avro are standard Java types.
- * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime.
- * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here
- * by generic type infos containing Utf8 classes (which are comparable),
- *
- * This class is checked by the AvroPojoTest.
- * @param <T>
- */
-public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
-	public AvroTypeInfo(Class<T> typeClass) {
-		super(typeClass, generateFieldsFromAvroSchema(typeClass));
-	}
-
-	private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
-		PojoTypeExtractor pte = new PojoTypeExtractor();
-		TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null, null, null);
-
-		if(!(ti instanceof PojoTypeInfo)) {
-			throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
-		}
-		PojoTypeInfo pti =  (PojoTypeInfo) ti;
-		List<PojoField> newFields = new ArrayList<PojoField>(pti.getTotalFields());
-
-		for(int i = 0; i < pti.getArity(); i++) {
-			PojoField f = pti.getPojoFieldAt(i);
-			TypeInformation newType = f.getTypeInformation();
-			// check if type is a CharSequence
-			if(newType instanceof GenericTypeInfo) {
-				if((newType).getTypeClass().equals(CharSequence.class)) {
-					// replace the type by a org.apache.avro.util.Utf8
-					newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
-				}
-			}
-			PojoField newField = new PojoField(f.getField(), newType);
-			newFields.add(newField);
-		}
-		return newFields;
-	}
-
-	private static class PojoTypeExtractor extends TypeExtractor {
-		private PojoTypeExtractor() {
-			super();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
deleted file mode 100644
index 8382831..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-/**
- * This type represents a value of one two possible types, Left or Right (a
- * disjoint union), inspired by Scala's Either type.
- *
- * @param <L>
- *            the type of Left
- * @param <R>
- *            the type of Right
- */
-public abstract class Either<L, R> {
-
-	/**
-	 * Create a Left value of Either
-	 */
-	public static <L, R> Either<L, R> Left(L value) {
-		return new Left<L, R>(value);
-	}
-
-	/**
-	 * Create a Right value of Either
-	 */
-	public static <L, R> Either<L, R> Right(R value) {
-		return new Right<L, R>(value);
-	}
-
-	/**
-	 * Retrieve the Left value of Either.
-	 * 
-	 * @return the Left value
-	 * @throws IllegalStateException
-	 *             if called on a Right
-	 */
-	public abstract L left() throws IllegalStateException;
-
-	/**
-	 * Retrieve the Right value of Either.
-	 * 
-	 * @return the Right value
-	 * @throws IllegalStateException
-	 *             if called on a Left
-	 */
-	public abstract R right() throws IllegalStateException;
-
-	/**
-	 * 
-	 * @return true if this is a Left value, false if this is a Right value
-	 */
-	public final boolean isLeft() {
-		return getClass() == Left.class;
-	}
-
-	/**
-	 * 
-	 * @return true if this is a Right value, false if this is a Left value
-	 */
-	public final boolean isRight() {
-		return getClass() == Right.class;
-	}
-
-	/**
-	 * A left value of {@link Either}
-	 *
-	 * @param <L>
-	 *            the type of Left
-	 * @param <R>
-	 *            the type of Right
-	 */
-	public static class Left<L, R> extends Either<L, R> {
-		private final L value;
-
-		public Left(L value) {
-			this.value = java.util.Objects.requireNonNull(value);
-		}
-
-		@Override
-		public L left() {
-			return value;
-		}
-
-		@Override
-		public R right() {
-			throw new IllegalStateException("Cannot retrieve Right value on a Left");
-		}
-
-		@Override
-		public boolean equals(Object object) {
-			if (object instanceof Left<?, ?>) {
-				final Left<?, ?> other = (Left<?, ?>) object;
-				return value.equals(other.value);
-			}
-			return false;
-		}
-
-		@Override
-		public int hashCode() {
-			return value.hashCode();
-		}
-
-		@Override
-		public String toString() {
-			return "Left(" + value.toString() + ")";
-		}
-
-		/**
-		 * Creates a left value of {@link Either}
-		 * 
-		 */
-		public static <L, R> Left<L, R> of(L left) {
-			return new Left<L, R>(left);
-		}
-	}
-
-	/**
-	 * A right value of {@link Either}
-	 *
-	 * @param <L>
-	 *            the type of Left
-	 * @param <R>
-	 *            the type of Right
-	 */
-	public static class Right<L, R> extends Either<L, R> {
-		private final R value;
-
-		public Right(R value) {
-			this.value = java.util.Objects.requireNonNull(value);
-		}
-
-		@Override
-		public L left() {
-			throw new IllegalStateException("Cannot retrieve Left value on a Right");
-		}
-
-		@Override
-		public R right() {
-			return value;
-		}
-
-		@Override
-		public boolean equals(Object object) {
-			if (object instanceof Right<?, ?>) {
-				final Right<?, ?> other = (Right<?, ?>) object;
-				return value.equals(other.value);
-			}
-			return false;
-		}
-
-		@Override
-		public int hashCode() {
-			return value.hashCode();
-		}
-
-		@Override
-		public String toString() {
-			return "Right(" + value.toString() + ")";
-		}
-
-		/**
-		 * Creates a right value of {@link Either}
-		 * 
-		 */
-		public static <L, R> Right<L, R> of(R right) {
-			return new Right<L, R>(right);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
deleted file mode 100644
index ec7be97..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
-
-/**
- * A {@link TypeInformation} for the {@link Either} type of the Java API.
- *
- * @param <L> the Left value type
- * @param <R> the Right value type
- */
-public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final TypeInformation<L> leftType;
-
-	private final TypeInformation<R> rightType;
-
-	public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
-		this.leftType = leftType;
-		this.rightType = rightType;
-	}
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-
-	@Override
-	public int getTotalFields() {
-		return 1;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Class<Either<L, R>> getTypeClass() {
-		return (Class<Either<L, R>>) (Class<?>) Either.class;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return false;
-	}
-
-	@Override
-	public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig config) {
-		return new EitherSerializer<L, R>(leftType.createSerializer(config),
-				rightType.createSerializer(config));
-	}
-
-	@Override
-	public String toString() {
-		return "Either <" + leftType.toString() + ", " + rightType.toString() + ">";
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof EitherTypeInfo) {
-			EitherTypeInfo<L, R> other = (EitherTypeInfo<L, R>) obj;
-
-			return other.canEqual(this) &&
-				leftType.equals(other.leftType) &&
-				rightType.equals(other.rightType);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return 17 * leftType.hashCode() + rightType.hashCode();
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof EitherTypeInfo;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public TypeInformation<L> getLeftType() {
-		return leftType;
-	}
-
-	public TypeInformation<R> getRightType() {
-		return rightType;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
deleted file mode 100644
index de59c36..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.EnumComparator;
-import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-
-/**
- * A {@link TypeInformation} for java enumeration types. 
- *
- * @param <T> The type represented by this type information.
- */
-public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implements AtomicType<T> {
-
-	private static final long serialVersionUID = 8936740290137178660L;
-	
-	private final Class<T> typeClass;
-
-	public EnumTypeInfo(Class<T> typeClass) {
-		Preconditions.checkNotNull(typeClass, "Enum type class must not be null.");
-
-		if (!Enum.class.isAssignableFrom(typeClass) ) {
-			throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());
-		}
-
-		this.typeClass = typeClass;
-	}
-
-	@Override
-	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
-		return new EnumComparator<T>(sortOrderAscending);
-	}
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-	
-	@Override
-	public int getTotalFields() {
-		return 1;
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return this.typeClass;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return true;
-	}
-
-	@Override
-	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
-		return new EnumSerializer<T>(typeClass);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Standard utils
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "EnumTypeInfo<" + typeClass.getName() + ">";
-	}	
-	
-	@Override
-	public int hashCode() {
-		return typeClass.hashCode();
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof EnumTypeInfo;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof EnumTypeInfo) {
-			@SuppressWarnings("unchecked")
-			EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj;
-
-			return enumTypeInfo.canEqual(this) &&
-				typeClass == enumTypeInfo.typeClass;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
deleted file mode 100644
index 7e7aa68..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-
-
-public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
-
-	private static final long serialVersionUID = -7959114120287706504L;
-	
-	private final Class<T> typeClass;
-
-	public GenericTypeInfo(Class<T> typeClass) {
-		this.typeClass = Preconditions.checkNotNull(typeClass);
-	}
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-	
-	@Override
-	public int getTotalFields() {
-		return 1;
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return typeClass;
-	}
-	
-	@Override
-	public boolean isKeyType() {
-		return Comparable.class.isAssignableFrom(typeClass);
-	}
-
-	@Override
-	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-		return new KryoSerializer<T>(this.typeClass, config);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
-		if (isKeyType()) {
-			@SuppressWarnings("rawtypes")
-			GenericTypeComparator comparator = new GenericTypeComparator(sortOrderAscending, createSerializer(executionConfig), this.typeClass);
-			return (TypeComparator<T>) comparator;
-		}
-
-		throw new UnsupportedOperationException("Types that do not implement java.lang.Comparable cannot be used as keys.");
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return typeClass.hashCode();
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof GenericTypeInfo;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof GenericTypeInfo) {
-			@SuppressWarnings("unchecked")
-			GenericTypeInfo<T> genericTypeInfo = (GenericTypeInfo<T>) obj;
-
-			return typeClass == genericTypeInfo.typeClass;
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return "GenericType<" + typeClass.getCanonicalName() + ">";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
deleted file mode 100644
index f8b4247..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this interface to be configured
- * with the data type they will operate on. The method {@link #setInputType(org.apache.flink.api
- * .common.typeinfo.TypeInformation, org.apache.flink.api.common.ExecutionConfig)} will be
- * called when the output format is used with an output method such as
- * {@link org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
- */
-public interface InputTypeConfigurable {
-
-	/**
-	 * Method that is called on an {@link org.apache.flink.api.common.io.OutputFormat} when it is passed to
-	 * the DataSet's output method. May be used to configures the output format based on the data type.
-	 *
-	 * @param type The data type of the input.
-	 * @param executionConfig
-	 */
-	void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
deleted file mode 100644
index 1dd7f01..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * A special type information signifying that the type extraction failed. It contains
- * additional error information.
- */
-public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
-
-	private static final long serialVersionUID = -4212082837126702723L;
-	
-	private final String functionName;
-	private final InvalidTypesException typeException;
-
-	
-	public MissingTypeInfo(String functionName) {
-		this(functionName, new InvalidTypesException("An unknown error occured."));
-	}
-
-	public MissingTypeInfo(String functionName, InvalidTypesException typeException) {
-		this.functionName = functionName;
-		this.typeException = typeException;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public String getFunctionName() {
-		return functionName;
-	}
-
-	public InvalidTypesException getTypeException() {
-		return typeException;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isBasicType() {
-		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
-	}
-
-	@Override
-	public boolean isTupleType() {
-		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
-	}
-
-	@Override
-	public int getArity() {
-		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
-	}
-
-	@Override
-	public Class<InvalidTypesException> getTypeClass() {
-		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
-	}
-
-	@Override
-	public boolean isKeyType() {
-		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
-	}
-
-	@Override
-	public TypeSerializer<InvalidTypesException> createSerializer(ExecutionConfig executionConfig) {
-		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
-	}
-
-	@Override
-	public String toString() {
-		return getClass().getSimpleName() + "<" + functionName + ", " + typeException.getMessage() + ">";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof MissingTypeInfo) {
-			MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj;
-
-			return missingTypeInfo.canEqual(this) &&
-				functionName.equals(missingTypeInfo.functionName) &&
-				typeException.equals(missingTypeInfo.typeException);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return 31 * functionName.hashCode() + typeException.hashCode();
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof MissingTypeInfo;
-	}
-
-	@Override
-	public int getTotalFields() {
-		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
deleted file mode 100644
index 150c976..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.lang.reflect.Array;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-
-public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> arrayType;
-	private final TypeInformation<C> componentInfo;
-
-	private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) {
-		this.arrayType = Preconditions.checkNotNull(arrayType);
-		this.componentInfo = Preconditions.checkNotNull(componentInfo);
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-
-	@Override
-	public int getTotalFields() {
-		return 1;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Class<T> getTypeClass() {
-		return arrayType;
-	}
-
-	public TypeInformation<C> getComponentInfo() {
-		return componentInfo;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return false;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
-		return (TypeSerializer<T>) new GenericArraySerializer<C>(
-			componentInfo.getTypeClass(),
-			componentInfo.createSerializer(executionConfig));
-	}
-
-	@Override
-	public String toString() {
-		return this.getClass().getSimpleName() + "<" + this.componentInfo + ">";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ObjectArrayTypeInfo) {
-			@SuppressWarnings("unchecked")
-			ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = (ObjectArrayTypeInfo<T, C>)obj;
-
-			return objectArrayTypeInfo.canEqual(this) &&
-				arrayType == objectArrayTypeInfo.arrayType &&
-				componentInfo.equals(objectArrayTypeInfo.componentInfo);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof ObjectArrayTypeInfo;
-	}
-
-	@Override
-	public int hashCode() {
-		return 31 * this.arrayType.hashCode() + this.componentInfo.hashCode();
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) {
-		Preconditions.checkNotNull(arrayClass);
-		Preconditions.checkNotNull(componentInfo);
-		Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array.");
-
-		return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
-	}
-
-	/**
-	 * Creates a new {@link org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo} from a
-	 * {@link TypeInformation} for the component type.
-	 *
-	 * <p>
-	 * This must be used in cases where the complete type of the array is not available as a
-	 * {@link java.lang.reflect.Type} or {@link java.lang.Class}.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) {
-		Preconditions.checkNotNull(componentInfo);
-
-		return new ObjectArrayTypeInfo<T, C>(
-			(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
-			componentInfo);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
deleted file mode 100644
index 1b008c0..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * Represent a field definition for {@link PojoTypeInfo} type of objects.
- */
-public class PojoField implements Serializable {
-
-	private static final long serialVersionUID = 1975295846436559363L;
-
-	private transient Field field;
-	private final TypeInformation<?> type;
-
-	public PojoField(Field field, TypeInformation<?> type) {
-		this.field = Preconditions.checkNotNull(field);
-		this.type = Preconditions.checkNotNull(type);
-	}
-
-	public Field getField() {
-		return field;
-	}
-
-	public TypeInformation<?> getTypeInformation() {
-		return type;
-	}
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		out.writeObject(field.getDeclaringClass());
-		out.writeUTF(field.getName());
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		Class<?> clazz = (Class<?>)in.readObject();
-		String fieldName = in.readUTF();
-		field = null;
-		// try superclasses as well
-		while (clazz != null) {
-			try {
-				field = clazz.getDeclaredField(fieldName);
-				field.setAccessible(true);
-				break;
-			} catch (NoSuchFieldException e) {
-				clazz = clazz.getSuperclass();
-			}
-		}
-		if (field == null) {
-			throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
-					+ " (" + fieldName + ")");
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof PojoField) {
-			PojoField other = (PojoField) obj;
-
-			return other.canEqual(this) && type.equals(other.type) &&
-				Objects.equals(field, other.field);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return Objects.hash(field, type);
-	}
-
-	public boolean canEqual(Object obj) {
-		return obj instanceof PojoField;
-	}
-}
\ No newline at end of file


Mime
View raw message