flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/2] flink git commit: [FLINK-3169] Remove Key class
Date Tue, 08 Mar 2016 12:49:51 GMT
[FLINK-3169] Remove Key class

This closes #1667.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe117afd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe117afd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe117afd

Branch: refs/heads/master
Commit: fe117afd31e823006f680d145c8f75033816ed17
Parents: d7aa989
Author: zentol <s.motsu@web.de>
Authored: Tue Feb 16 12:23:41 2016 +0100
Committer: zentol <s.motsu@web.de>
Committed: Tue Mar 8 13:49:01 2016 +0100

----------------------------------------------------------------------
 .../distributions/SimpleDistribution.java       | 182 --------------
 .../SimpleIntegerDistribution.java              | 160 ------------
 .../UniformDoubleDistribution.java              |  65 -----
 .../UniformIntegerDistribution.java             |  66 -----
 .../flink/api/common/operators/Ordering.java    |  11 +-
 .../org/apache/flink/types/BooleanValue.java    |   2 -
 .../java/org/apache/flink/types/ByteValue.java  |   2 -
 .../java/org/apache/flink/types/CharValue.java  |   2 -
 .../org/apache/flink/types/DoubleValue.java     |   4 +-
 .../java/org/apache/flink/types/FloatValue.java |   4 +-
 .../java/org/apache/flink/types/IntValue.java   |   2 -
 .../main/java/org/apache/flink/types/Key.java   |  59 -----
 .../java/org/apache/flink/types/LongValue.java  |   2 -
 .../org/apache/flink/types/NormalizableKey.java |   4 +-
 .../java/org/apache/flink/types/NullValue.java  |   2 -
 .../main/java/org/apache/flink/types/Pair.java  | 177 --------------
 .../java/org/apache/flink/types/ShortValue.java |   2 -
 .../org/apache/flink/types/StringValue.java     |   3 +-
 .../SimpleDataDistributionTest.java             | 228 -----------------
 .../flink/types/CollectionsDataTypeTest.java    | 243 -------------------
 .../flink/optimizer/postpass/PostPassUtils.java |  47 ----
 .../optimizer/postpass/SparseKeySchema.java     |  86 -------
 .../dataproperties/MockDistribution.java        |   5 +-
 .../runtime/operators/CachedMatchTaskTest.java  |   6 +-
 .../operators/CoGroupTaskExternalITCase.java    |   6 +-
 .../runtime/operators/CoGroupTaskTest.java      |   6 +-
 .../operators/CombineTaskExternalITCase.java    |   4 +-
 .../operators/JoinTaskExternalITCase.java       |   6 +-
 .../flink/runtime/operators/JoinTaskTest.java   |   6 +-
 .../operators/ReduceTaskExternalITCase.java     |   5 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   4 +-
 .../runtime/operators/hash/HashTableITCase.java |   4 +-
 .../testutils/recordutils/RecordComparator.java |  46 ++--
 .../recordutils/RecordComparatorFactory.java    |  16 +-
 .../recordutils/RecordPairComparator.java       |  18 +-
 .../RecordPairComparatorFactory.java            |  12 +-
 .../org/apache/flink/test/util/CoordVector.java |   4 +-
 37 files changed, 84 insertions(+), 1417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java
deleted file mode 100644
index bf759d9..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.distributions;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.flink.util.InstantiationUtil;
-
-@PublicEvolving
-public class SimpleDistribution implements DataDistribution {
-	
-	private static final long serialVersionUID = 1L;
-
-	protected Key<?>[][] boundaries; 
-	
-	protected int dim;
-	
-	
-	public SimpleDistribution() {
-		boundaries = new Key[0][];
-	}
-	
-	public SimpleDistribution(Key<?>[] bucketBoundaries) {
-		if (bucketBoundaries == null) {
-			throw new IllegalArgumentException("Bucket boundaries must not be null.");
-		}
-		if (bucketBoundaries.length == 0) {
-			throw new IllegalArgumentException("Bucket boundaries must not be empty.");
-		}
-
-		// dimensionality is one in this case
-		dim = 1;
-		
-		@SuppressWarnings("unchecked")
-		Class<? extends Key<?>> clazz = (Class<? extends Key<?>>) bucketBoundaries[0].getClass();
-		
-		// make the array 2-dimensional
-		boundaries = new Key[bucketBoundaries.length][];
-		for (int i = 0; i < bucketBoundaries.length; i++) {
-			if (bucketBoundaries[i].getClass() != clazz) {
-				throw new IllegalArgumentException("The bucket boundaries are of different class types.");
-			}
-			
-			boundaries[i] = new Key[] { bucketBoundaries[i] };
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	public SimpleDistribution(Key<?>[][] bucketBoundaries) {
-		if (bucketBoundaries == null) {
-			throw new IllegalArgumentException("Bucket boundaries must not be null.");
-		}
-		if (bucketBoundaries.length == 0) {
-			throw new IllegalArgumentException("Bucket boundaries must not be empty.");
-		}
-
-		// dimensionality is one in this case
-		dim = bucketBoundaries[0].length;
-		
-		Class<? extends Key<?>>[] types = new Class[dim];
-		for (int i = 0; i < dim; i++) {
-			types[i] = (Class<? extends Key<?>>) bucketBoundaries[0][i].getClass();
-		}
-		
-		// check the array
-		for (int i = 1; i < bucketBoundaries.length; i++) {
-			if (bucketBoundaries[i].length != dim) {
-				throw new IllegalArgumentException("All bucket boundaries must have the same dimensionality.");
-			}
-			for (int d = 0; d < dim; d++) {
-				if (types[d] != bucketBoundaries[i][d].getClass()) {
-					throw new IllegalArgumentException("The bucket boundaries are of different class types.");
-				}
-			}
-		}
-		
-		boundaries = bucketBoundaries;
-	}
-	
-	@Override
-	public int getNumberOfFields() {
-		return this.dim;
-	}
-
-	@Override
-	public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-		// check validity of arguments
-		if(bucketNum < 0) {
-			throw new IllegalArgumentException("Requested bucket must be greater than or equal to 0.");
-		} else if(bucketNum >= (totalNumBuckets - 1)) {
-			throw new IllegalArgumentException("Request bucket must be smaller than the total number of buckets minus 1.");
-		}
-		if(totalNumBuckets < 1) {
-			throw new IllegalArgumentException("Total number of bucket must be larger than 0.");
-		}
-		
-		final int maxNumBuckets = boundaries.length + 1;
-		
-		// check if max number of buckets is equal to or an even multiple of the requested number of buckets
-		if((maxNumBuckets % totalNumBuckets) == 0) {
-			// easy case, just use each n-th boundary
-			final int n = maxNumBuckets / totalNumBuckets;
-			final int bucketId = bucketNum * n + (n -  1); 
-			
-			return boundaries[bucketId];
-		} else {
-			throw new IllegalArgumentException("Interpolation of bucket boundaries currently not supported. " +
-					"Please use an even divider of the maximum possible buckets (here: "+maxNumBuckets+") as totalBuckets.");
-			// TODO: might be relaxed if much more boundary records are available than requested
-		}
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.dim);
-		out.writeInt(boundaries.length);
-		
-		// write types
-		for (int i = 0; i < dim; i++) {
-			out.writeUTF(boundaries[0][i].getClass().getName());
-		}
-		
-		for (int i = 0; i < boundaries.length; i++) {
-			for (int d = 0; d < dim; d++) {
-				boundaries[i][d].write(out);
-			}
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.dim = in.readInt();
-		final int len = in.readInt();
-		
-		boundaries = new Key[len][];
-		
-		// read types
-		Class<? extends Key<?>>[] types = new Class[dim];
-		for (int i = 0; i < dim; i++) {
-			String className = in.readUTF();
-			try {
-				types[i] = (Class<? extends Key<?>>) Class.forName(className, true, getClass().getClassLoader()).asSubclass(Key.class);
-			} catch (ClassNotFoundException e) {
-				throw new IOException("Could not load type class '" + className + "'.");
-			} catch (Throwable t) {
-				throw new IOException("Error loading type class '" + className + "'.", t);
-			}
-		}
-		
-		for (int i = 0; i < len; i++) {
-			Key<?>[] bucket = new Key[dim]; 
-			for (int d = 0; d < dim; d++) {
-				Key<?> val = InstantiationUtil.instantiate(types[d], Key.class);
-				val.read(in);
-				bucket[d] = val;
-			}
-			
-			boundaries[i] = bucket;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java
deleted file mode 100644
index cd1a8c5..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.distributions;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.IntValue;
-
-@PublicEvolving
-public class SimpleIntegerDistribution extends SimpleDistribution {
-	
-	private static final long serialVersionUID = 1L;
-	
-	
-	public SimpleIntegerDistribution() {
-		boundaries = new IntValue[0][];
-	}
-	
-	public SimpleIntegerDistribution(int[] bucketBoundaries) {
-		if (bucketBoundaries == null) {
-			throw new IllegalArgumentException("Bucket boundaries must not be null.");
-		}
-		if (bucketBoundaries.length == 0) {
-			throw new IllegalArgumentException("Bucket boundaries must not be empty.");
-		}
-
-		// dimensionality is one in this case
-		dim = 1;
-		
-		// make the array 2-dimensional
-		boundaries = packIntegers(bucketBoundaries);
-	}
-	
-	public SimpleIntegerDistribution(IntValue[] bucketBoundaries) {
-		if (bucketBoundaries == null) {
-			throw new IllegalArgumentException("Bucket boundaries must not be null.");
-		}
-		if (bucketBoundaries.length == 0) {
-			throw new IllegalArgumentException("Bucket boundaries must not be empty.");
-		}
-
-		// dimensionality is one in this case
-		dim = 1;
-		
-		// make the array 2-dimensional
-		boundaries = new IntValue[bucketBoundaries.length][];
-		for (int i = 0; i < bucketBoundaries.length; i++) {
-			boundaries[i] = new IntValue[] { bucketBoundaries[i] };
-		}
-	}
-	
-	public SimpleIntegerDistribution(IntValue[][] bucketBoundaries) {
-		if (bucketBoundaries == null) {
-			throw new IllegalArgumentException("Bucket boundaries must not be null.");
-		}
-		if (bucketBoundaries.length == 0) {
-			throw new IllegalArgumentException("Bucket boundaries must not be empty.");
-		}
-
-		// dimensionality is one in this case
-		dim = bucketBoundaries[0].length;
-		
-		// check the array
-		for (int i = 1; i < bucketBoundaries.length; i++) {
-			if (bucketBoundaries[i].length != dim) {
-				throw new IllegalArgumentException("All bucket boundaries must have the same dimensionality.");
-			}
-		}
-	}
-	
-	@Override
-	public int getNumberOfFields() {
-		return this.dim;
-	}
-
-	@Override
-	public IntValue[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-		// check validity of arguments
-		if(bucketNum < 0) {
-			throw new IllegalArgumentException("Requested bucket must be greater than or equal to 0.");
-		} else if(bucketNum >= (totalNumBuckets - 1)) {
-			throw new IllegalArgumentException("Request bucket must be smaller than the total number of buckets minus 1.");
-		}
-		if(totalNumBuckets < 1) {
-			throw new IllegalArgumentException("Total number of bucket must be larger than 0.");
-		}
-		
-		final int maxNumBuckets = boundaries.length + 1;
-		
-		// check if max number of buckets is equal to or an even multiple of the requested number of buckets
-		if((maxNumBuckets % totalNumBuckets) == 0) {
-			// easy case, just use each n-th boundary
-			final int n = maxNumBuckets / totalNumBuckets;
-			final int bucketId = bucketNum * n + (n -  1); 
-			
-			return (IntValue[]) boundaries[bucketId];
-		} else {
-			throw new IllegalArgumentException("Interpolation of bucket boundaries currently not supported. " +
-					"Please use an even divider of the maximum possible buckets (here: "+maxNumBuckets+") as totalBuckets.");
-			// TODO: might be relaxed if much more boundary records are available than requested
-		}
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.dim);
-		out.writeInt(boundaries.length);
-		
-		for (int i = 0; i < boundaries.length; i++) {
-			for (int d = 0; d < dim; d++) {
-				out.writeInt(((IntValue) boundaries[i][d]).getValue());
-			}
-		}
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.dim = in.readInt();
-		final int len = in.readInt();
-		
-		boundaries = new IntValue[len][];
-		
-		for (int i = 0; i < len; i++) {
-			IntValue[] bucket = new IntValue[dim]; 
-			for (int d = 0; d < dim; d++) {
-				bucket[d] = new IntValue(in.readInt());
-			}
-			
-			boundaries[i] = bucket;
-		}
-	}
-	
-	private static IntValue[][] packIntegers(int[] values) {
-		IntValue[][] packed = new IntValue[values.length][];
-		for (int i = 0; i < values.length; i++) {
-			packed[i] = new IntValue[] { new IntValue(values[i]) };
-		}
-		
-		return packed;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java
deleted file mode 100644
index df1f095..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.distributions;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.DoubleValue;
-
-@PublicEvolving
-public class UniformDoubleDistribution implements DataDistribution {
-
-	private static final long serialVersionUID = 1L;
-	
-	private double min, max; 
-
-	
-	public UniformDoubleDistribution() {}
-	
-	public UniformDoubleDistribution(double min, double max) {
-		this.min = min;
-		this.max = max;
-	}
-
-	@Override
-	public DoubleValue[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-		double bucketSize = (max - min) / totalNumBuckets;
-		return new DoubleValue[] {new DoubleValue(min + (bucketNum+1) * bucketSize) };
-	}
-
-	@Override
-	public int getNumberOfFields() {
-		return 1;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeDouble(min);
-		out.writeDouble(max);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		min = in.readDouble();
-		max = in.readDouble();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java
deleted file mode 100644
index 504f65b..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.distributions;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.IntValue;
-
-@PublicEvolving
-public class UniformIntegerDistribution implements DataDistribution {
-
-	private static final long serialVersionUID = 1L;
-	
-	private int min, max; 
-
-	
-	public UniformIntegerDistribution() {}
-	
-	public UniformIntegerDistribution(int min, int max) {
-		this.min = min;
-		this.max = max;
-	}
-
-	@Override
-	public IntValue[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-		long diff = ((long) max) - ((long) min) + 1;
-		double bucketSize = diff / ((double) totalNumBuckets);
-		return new IntValue[] {new IntValue(min + (int) ((bucketNum+1) * bucketSize)) };
-	}
-
-	@Override
-	public int getNumberOfFields() {
-		return 1;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(min);
-		out.writeInt(max);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		min = in.readInt();
-		max = in.readInt();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index 23928b3..7332698 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.types.Key;
 
 /**
  * This class represents an ordering on a set of fields. It specifies the fields and order direction
@@ -34,7 +33,7 @@ public class Ordering implements Cloneable {
 	
 	protected FieldList indexes = new FieldList();
 	
-	protected final ArrayList<Class<? extends Key<?>>> types = new ArrayList<Class<? extends Key<?>>>();
+	protected final ArrayList<Class<? extends Comparable<?>>> types = new ArrayList<Class<? extends Comparable<?>>>();
 	
 	protected final ArrayList<Order> orders = new ArrayList<Order>();
 
@@ -50,7 +49,7 @@ public class Ordering implements Cloneable {
 	 * @param type
 	 * @param order
 	 */
-	public Ordering(int index, Class<? extends Key<?>> type, Order order) {
+	public Ordering(int index, Class<? extends Comparable<?>> type, Order order) {
 		appendOrdering(index, type, order);
 	}
 	
@@ -63,7 +62,7 @@ public class Ordering implements Cloneable {
 	 * 
 	 * @return This ordering with an additional appended order requirement.
 	 */
-	public Ordering appendOrdering(Integer index, Class<? extends Key<?>> type, Order order) {
+	public Ordering appendOrdering(Integer index, Class<? extends Comparable<?>> type, Order order) {
 		if (index.intValue() < 0) {
 			throw new IllegalArgumentException("The key index must not be negative.");
 		}
@@ -97,7 +96,7 @@ public class Ordering implements Cloneable {
 		return this.indexes.get(index);
 	}
 	
-	public Class<? extends Key<?>> getType(int index) {
+	public Class<? extends Comparable<?>> getType(int index) {
 		if (index < 0 || index >= this.types.size()) {
 			throw new IndexOutOfBoundsException(String.valueOf(index));
 		}
@@ -114,7 +113,7 @@ public class Ordering implements Cloneable {
 	// --------------------------------------------------------------------------------------------
 	
 	@SuppressWarnings("unchecked")
-	public Class<? extends Key<?>>[] getTypes() {
+	public Class<? extends Comparable<?>>[] getTypes() {
 		return this.types.toArray(new Class[this.types.size()]);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
index 10ca069..4bc387b 100644
--- a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable boolean type, representing the primitive
  * type {@code boolean}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableValue<BooleanValue>, CopyableValue<BooleanValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
index a1a9e32..cb1c669 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable byte type, representing the primitive
  * type {@code byte} (signed 8 bit integer).
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class ByteValue implements NormalizableKey<ByteValue>, ResettableValue<ByteValue>, CopyableValue<ByteValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/CharValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/CharValue.java b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
index 8b2ab29..f800832 100644
--- a/flink-core/src/main/java/org/apache/flink/types/CharValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable character type, representing the primitive
  * type {@code char}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class CharValue implements NormalizableKey<CharValue>, ResettableValue<CharValue>, CopyableValue<CharValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
index 80fb77b..7dba25b 100644
--- a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
@@ -28,11 +28,9 @@ import org.apache.flink.core.memory.DataOutputView;
 /**
  * Boxed serializable and comparable double precision floating point type, representing the primitive
  * type {@code double}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
-public class DoubleValue implements Key<DoubleValue>, ResettableValue<DoubleValue>, CopyableValue<DoubleValue> {
+public class DoubleValue implements Comparable<DoubleValue>, ResettableValue<DoubleValue>, CopyableValue<DoubleValue> {
 	private static final long serialVersionUID = 1L;
 
 	private double value;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
index 371ce52..e3e49ce 100644
--- a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
@@ -28,11 +28,9 @@ import org.apache.flink.core.memory.DataOutputView;
 /**
  * Boxed serializable and comparable single precision floating point type, representing the primitive
  * type {@code float}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
-public class FloatValue implements Key<FloatValue>, ResettableValue<FloatValue>, CopyableValue<FloatValue> {
+public class FloatValue implements Comparable<FloatValue>, ResettableValue<FloatValue>, CopyableValue<FloatValue> {
 	private static final long serialVersionUID = 1L;
 
 	private float value;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/IntValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/IntValue.java b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
index 0f63117..347fd1d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/IntValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable integer type, representing the primitive
  * type {@code int}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntValue>, CopyableValue<IntValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/Key.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Key.java b/flink-core/src/main/java/org/apache/flink/types/Key.java
deleted file mode 100644
index c1e0626..0000000
--- a/flink-core/src/main/java/org/apache/flink/types/Key.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.types;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * This interface has to be implemented by all data types that act as key. Keys are used to establish
- * relationships between values. A key must always be {@link java.lang.Comparable} to other keys of
- * the same type. In addition, keys must implement a correct {@link java.lang.Object#hashCode()} method
- * and {@link java.lang.Object#equals(Object)} method to ensure that grouping on keys works properly.
- * <p>
- * This interface extends {@link org.apache.flink.types.Value} and requires to implement
- * the serialization of its value.
- * 
- * @see org.apache.flink.types.Value
- * @see org.apache.flink.core.io.IOReadableWritable
- * @see java.lang.Comparable
- * 
- * @deprecated The Key type is a relict of a deprecated and removed API and will be removed
- *             in future versions as well.
- */
-@Deprecated
-@PublicEvolving
-public interface Key<T> extends Value, Comparable<T> {
-	
-	/**
-	 * All keys must override the hash-code function to generate proper deterministic hash codes,
-	 * based on their contents.
-	 * 
-	 * @return The hash code of the key
-	 */
-	public int hashCode();
-	
-	/**
-	 * Compares the object on equality with another object.
-	 * 
-	 * @param other The other object to compare against.
-	 * 
-	 * @return True, iff this object is identical to the other object, false otherwise.
-	 */
-	public boolean equals(Object other);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/LongValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/LongValue.java b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
index ef93864..dfb7845 100644
--- a/flink-core/src/main/java/org/apache/flink/types/LongValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable long integer type, representing the primitive
  * type {@code long}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class LongValue implements NormalizableKey<LongValue>, ResettableValue<LongValue>, CopyableValue<LongValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java b/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java
index 6c5e204..e040599 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java
@@ -36,8 +36,8 @@ import org.apache.flink.core.memory.MemorySegment;
  * key length.
  */
 @Public
-public interface NormalizableKey<T> extends Key<T> {
-	
+public interface NormalizableKey<T> extends Comparable<T> {
+
 	/**
 	 * Gets the maximal length of normalized keys that the data type would produce to determine
 	 * the order of instances solely by the normalized key. A value of {@link java.lang.Integer}.MAX_VALUE

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/NullValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullValue.java b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
index 9a3885d..e188929 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
@@ -28,8 +28,6 @@ import org.apache.flink.core.memory.MemorySegment;
 
 /**
  * Null base type for programs that implements the Key interface.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public final class NullValue implements NormalizableKey<NullValue>, CopyableValue<NullValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/Pair.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Pair.java b/flink-core/src/main/java/org/apache/flink/types/Pair.java
deleted file mode 100644
index 9bf2387..0000000
--- a/flink-core/src/main/java/org/apache/flink/types/Pair.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.types;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.ReflectionUtil;
-
-/**
- * Generic pair base type. 
- * 
- * @see org.apache.flink.types.Key
- * 
- * @param <U> Type of the pair's first element.
- * @param <V> Type of the pair's second element.
- */
-@PublicEvolving
-public abstract class Pair<U extends Key<U>, V extends Key<V>> implements Key<Pair<U, V>> {
-	private static final long serialVersionUID = 1L;
-	
-	// class of the first pair element
-	private final Class<U> firstClass;
-	// class of the second pair element
-	private final Class<V> secondClass;
-
-	// the first pair element
-	private U first;
-	// the second pair element
-	private V second;
-
-	/**
-	 * Initializes both encapsulated pair elements with empty objects.
-	 */
-	public Pair() {
-		this.firstClass = ReflectionUtil.<U> getTemplateType1(this.getClass());
-		this.secondClass = ReflectionUtil.<V> getTemplateType2(this.getClass());
-
-		try {
-			this.first = this.firstClass.newInstance();
-			this.second = this.secondClass.newInstance();
-		} catch (final InstantiationException e) {
-			throw new RuntimeException(e);
-		} catch (final IllegalAccessException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * Initializes the encapsulated pair elements with the provided values.
-	 * 
-	 * @param first Initial value of the first encapsulated pair element.
-	 * @param second Initial value of the second encapsulated pair element.
-	 */
-	public Pair(U first, V second) {
-		this.firstClass = ReflectionUtil.<U> getTemplateType1(this.getClass());
-		this.secondClass = ReflectionUtil.<V> getTemplateType1(this.getClass());
-
-		this.first = first;
-		this.second = second;
-	}
-
-	/**
-	 * Returns the first encapsulated pair element.
-	 * 
-	 * @return The first encapsulated pair element.
-	 */
-	public U getFirst() {
-		return this.first;
-	}
-
-	/**
-	 * Sets the first encapsulated pair element to the specified value.
-	 * 
-	 * @param first
-	 *        The new value of the first encapsulated pair element.
-	 */
-	public void setFirst(final U first) {
-		if (first == null) {
-			throw new NullPointerException("first must not be null");
-		}
-
-		this.first = first;
-	}
-
-	/**
-	 * Returns the second encapsulated pair element.
-	 * 
-	 * @return The second encapsulated pair element.
-	 */
-	public V getSecond() {
-		return this.second;
-	}
-
-	/**
-	 * Sets the second encapsulated pair element to the specified value.
-	 * 
-	 * @param second
-	 *        The new value of the second encapsulated pair element.
-	 */
-	public void setSecond(final V second) {
-		if (second == null) {
-			throw new NullPointerException("second must not be null");
-		}
-
-		this.second = second;
-	}
-
-	@Override
-	public String toString() {
-		return "<" + this.first.toString() + "|" + this.second.toString() + ">";
-	}
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		this.first.read(in);
-		this.second.read(in);
-	}
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		this.first.write(out);
-		this.second.write(out);
-	}
-
-	@Override
-	public int compareTo(Pair<U, V> o) {
-		int result = this.first.compareTo(o.first);
-		if (result == 0) {
-			result = this.second.compareTo(o.second);
-		}
-		return result;
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + this.first.hashCode();
-		result = prime * result + this.second.hashCode();
-		return result;
-	}
-
-	@Override
-	public boolean equals(final Object obj) {
-		if (this == obj) {
-			return true;
-		}
-		if (obj == null) {
-			return false;
-		}
-		if (this.getClass() != obj.getClass()) {
-			return false;
-		}
-		final Pair<?, ?> other = (Pair<?, ?>) obj;
-		return this.first.equals(other.first) && this.second.equals(other.second);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
index b91aaac..970e7e4 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable short integer type, representing the primitive
  * type {@code short}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class ShortValue implements NormalizableKey<ShortValue>, ResettableValue<ShortValue>, CopyableValue<ShortValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index 0f9105c..e20083e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -37,8 +37,7 @@ import com.google.common.base.Preconditions;
  * The mutability allows to reuse the object inside the user code, also across invocations. Reusing a StringValue object
  * helps to increase the performance, as string objects are rather heavy-weight objects and incur a lot of garbage
  * collection overhead, if created and destroyed in masses.
- * 
- * @see org.apache.flink.types.Key
+ *
  * @see org.apache.flink.types.NormalizableKey
  * @see java.lang.String
  * @see java.lang.CharSequence

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java b/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
deleted file mode 100644
index a58e6ab..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.distributions;
-
-import org.apache.flink.api.common.distributions.SimpleDistribution;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.StringValue;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-public class SimpleDataDistributionTest {
-
-	@Test
-	public void testConstructorSingleKey() {
-
-		// check correct data distribution
-		try {
-			SimpleDistribution dd = new SimpleDistribution(new Key<?>[] {new IntValue(1), new IntValue(2), new IntValue(3)});
-			Assert.assertEquals(1, dd.getNumberOfFields());
-		}
-		catch (Throwable t) {
-			Assert.fail();
-		}
-		
-		// check incorrect key types
-		try {
-			new SimpleDistribution(new Key<?>[] {new IntValue(1), new StringValue("ABC"), new IntValue(3)});
-			Assert.fail("Data distribution accepts inconsistent key types");
-		} catch(IllegalArgumentException iae) {
-			// do nothing
-		}
-		
-		// check inconsistent number of keys
-		try {
-			new SimpleDistribution(new Key<?>[][] {{new IntValue(1)}, {new IntValue(2), new IntValue(2)}, {new IntValue(3)}});
-			Assert.fail("Data distribution accepts inconsistent many keys");
-		} catch(IllegalArgumentException iae) {
-			// do nothing
-		}
-	}
-	
-	@Test 
-	public void testConstructorMultiKey() {
-		
-		// check correct data distribution
-		SimpleDistribution dd = new SimpleDistribution(
-				new Key<?>[][] {{new IntValue(1), new StringValue("A"), new IntValue(1)}, 
-							{new IntValue(2), new StringValue("A"), new IntValue(1)}, 
-							{new IntValue(3), new StringValue("A"), new IntValue(1)}});
-		Assert.assertEquals(3, dd.getNumberOfFields());
-		
-		// check inconsistent key types
-		try {
-			new SimpleDistribution( 
-					new Key<?>[][] {{new IntValue(1), new StringValue("A"), new DoubleValue(1.3d)}, 
-								{new IntValue(2), new StringValue("B"), new IntValue(1)}});
-			Assert.fail("Data distribution accepts incorrect key types");
-		} catch(IllegalArgumentException iae) {
-			// do nothing
-		}
-		
-		// check inconsistent number of keys
-		try {
-			new SimpleDistribution(
-					new Key<?>[][] {{new IntValue(1), new IntValue(2)}, 
-								{new IntValue(2), new IntValue(2)}, 
-								{new IntValue(3)}});
-			Assert.fail("Data distribution accepts bucket boundaries with inconsistent many keys");
-		} catch(IllegalArgumentException iae) {
-			// do nothing
-		}
-		
-	}
-	
-	@Test
-	public void testWriteRead() {
-		
-		SimpleDistribution ddWrite = new SimpleDistribution(
-				new Key<?>[][] {{new IntValue(1), new StringValue("A"), new IntValue(1)}, 
-							{new IntValue(2), new StringValue("A"), new IntValue(1)}, 
-							{new IntValue(2), new StringValue("B"), new IntValue(4)},
-							{new IntValue(2), new StringValue("B"), new IntValue(3)},
-							{new IntValue(2), new StringValue("B"), new IntValue(2)}});
-		Assert.assertEquals(3, ddWrite.getNumberOfFields());
-		
-		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		
-		try {
-			ddWrite.write(new DataOutputViewStreamWrapper(baos));
-		} catch (IOException e) {
-			Assert.fail("Error serializing the DataDistribution: " + e.getMessage());
-		}
-
-		byte[] seralizedDD = baos.toByteArray();
-		
-		final ByteArrayInputStream bais = new ByteArrayInputStream(seralizedDD);
-		
-		SimpleDistribution ddRead = new SimpleDistribution();
-		
-		try {
-			ddRead.read(new DataInputViewStreamWrapper(bais));
-		} catch (Exception ex) {
-			Assert.fail("The deserialization of the encoded data distribution caused an error");
-		}
-		
-		Assert.assertEquals(3, ddRead.getNumberOfFields());
-		
-		// compare written and read distributions
-		for(int i=0;i<6;i++) {
-			Key<?>[] recW = ddWrite.getBucketBoundary(0, 6);
-			Key<?>[] recR = ddWrite.getBucketBoundary(0, 6);
-			
-			Assert.assertEquals(recW[0], recR[0]);
-			Assert.assertEquals(recW[1], recR[1]);
-			Assert.assertEquals(recW[2], recR[2]);
-		}
-	}
-	
-	@Test
-	public void testGetBucketBoundary() {
-		
-		SimpleDistribution dd = new SimpleDistribution(
-				new Key<?>[][] {{new IntValue(1), new StringValue("A")}, 
-							{new IntValue(2), new StringValue("B")}, 
-							{new IntValue(3), new StringValue("C")},
-							{new IntValue(4), new StringValue("D")},
-							{new IntValue(5), new StringValue("E")},
-							{new IntValue(6), new StringValue("F")},
-							{new IntValue(7), new StringValue("G")}});
-		
-		Key<?>[] boundRec = dd.getBucketBoundary(0, 8);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 1);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("A"));
-		
-		boundRec = dd.getBucketBoundary(1, 8);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 2);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("B"));
-		
-		boundRec = dd.getBucketBoundary(2, 8);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 3);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("C"));
-		
-		boundRec = dd.getBucketBoundary(3, 8);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("D"));
-		
-		boundRec = dd.getBucketBoundary(4, 8);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 5);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("E"));
-		
-		boundRec = dd.getBucketBoundary(5, 8);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 6);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("F"));
-		
-		boundRec = dd.getBucketBoundary(6, 8);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 7);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("G"));
-		
-		boundRec = dd.getBucketBoundary(0, 4);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 2);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("B"));
-		
-		boundRec = dd.getBucketBoundary(1, 4);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("D"));
-		
-		boundRec = dd.getBucketBoundary(2, 4);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 6);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("F"));
-		
-		boundRec = dd.getBucketBoundary(0, 2);
-		Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4);
-		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("D"));
-		
-		try {
-			dd.getBucketBoundary(0, 7);
-			Assert.fail();
-		} catch(IllegalArgumentException iae) {
-			// nothing to do
-		}
-		
-		try {
-			dd.getBucketBoundary(3, 4);
-			Assert.fail();
-		} catch(IllegalArgumentException iae) {
-			// nothing to do
-		}
-		
-		try {
-			dd.getBucketBoundary(-1, 4);
-			Assert.fail();
-		} catch(IllegalArgumentException iae) {
-			// nothing to do
-		}
-		
-		try {
-			dd.getBucketBoundary(0, 0);
-			Assert.fail();
-		} catch(IllegalArgumentException iae) {
-			// nothing to do
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
deleted file mode 100644
index b61dd6e..0000000
--- a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.types;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map.Entry;
-
-public class CollectionsDataTypeTest {
-	
-	private DataOutputView out;
-
-	private DataInputView in;
-
-	@Before
-	public void setup() throws Exception {
-		PipedInputStream input = new PipedInputStream(1000);
-		in = new DataInputViewStreamWrapper(input);
-		out = new DataOutputViewStreamWrapper(new PipedOutputStream(input));
-	}
-
-	@Test
-	public void testPair() {
-		NfIntStringPair pair1 = new NfIntStringPair();
-
-		pair1.setFirst(new IntValue(10));
-		pair1.setSecond(new StringValue("This is a string"));
-
-		// test data retrieval
-		Assert.assertEquals(pair1.getFirst(), new IntValue(10));
-		Assert.assertEquals(pair1.getSecond(), new StringValue("This is a string"));
-
-		// test serialization
-		try {
-			NfIntStringPair mPairActual = new NfIntStringPair();
-
-			pair1.write(out);
-			mPairActual.read(in);
-
-			Assert.assertEquals(pair1, mPairActual);
-		} catch (IOException e) {
-			Assert.fail("Unexpected IOException");
-		}
-
-		// test comparison
-		NfIntStringPair pair2 = new NfIntStringPair();
-		NfIntStringPair pair3 = new NfIntStringPair();
-		NfIntStringPair pair4 = new NfIntStringPair();
-		NfIntStringPair pair5 = new NfIntStringPair();
-		NfIntStringPair pair6 = new NfIntStringPair();
-
-		pair2.setFirst(new IntValue(10));
-		pair2.setSecond(new StringValue("This is a string"));
-
-		pair3.setFirst(new IntValue(5));
-		pair3.setSecond(new StringValue("This is a string"));
-
-		pair4.setFirst(new IntValue(15));
-		pair4.setSecond(new StringValue("This is a string"));
-
-		pair5.setFirst(new IntValue(10));
-		pair5.setSecond(new StringValue("This is a strina"));
-
-		pair6.setFirst(new IntValue(10));
-		pair6.setSecond(new StringValue("This is a strinz"));
-
-		Assert.assertTrue(pair1.compareTo(pair2) == 0);
-		Assert.assertTrue(pair1.compareTo(pair3) > 0);
-		Assert.assertTrue(pair1.compareTo(pair4) < 0);
-		Assert.assertTrue(pair1.compareTo(pair5) > 0);
-		Assert.assertTrue(pair1.compareTo(pair6) < 0);
-
-		Assert.assertTrue(pair1.equals(pair2));
-		Assert.assertFalse(pair1.equals(pair3));
-		Assert.assertFalse(pair1.equals(pair4));
-		Assert.assertFalse(pair1.equals(pair5));
-		Assert.assertFalse(pair1.equals(pair6));
-
-		// test incorrect comparison
-		NfDoubleStringPair mPair7 = new NfDoubleStringPair();
-		mPair7.setFirst(new DoubleValue(2.3));
-
-		// this is caught by the compiler now
-//		try {
-//			pair1.compareTo(mPair7);
-//			Assert.fail();
-//		} catch (Exception e) {
-//			Assert.assertTrue(e instanceof ClassCastException);
-//		}
-
-		// test sorting
-		NfIntStringPair[] pairs = new NfIntStringPair[5];
-
-		pairs[0] = pair1;
-		pairs[1] = pair2;
-		pairs[2] = pair3;
-		pairs[3] = pair4;
-		pairs[4] = pair5;
-
-		Arrays.sort(pairs);
-
-		NfIntStringPair p1, p2;
-
-		for (int i = 1; i < 5; i++) {
-			p1 = pairs[i - 1];
-			p2 = pairs[i];
-
-			Assert.assertTrue(p1.compareTo(p2) <= 0);
-		}
-
-		// test hashing
-		HashSet<NfIntStringPair> pairSet = new HashSet<NfIntStringPair>();
-
-		Assert.assertTrue(pairSet.add(pair2));
-		Assert.assertTrue(pairSet.add(pair3));
-		Assert.assertTrue(pairSet.add(pair4));
-		Assert.assertTrue(pairSet.add(pair5));
-		Assert.assertTrue(pairSet.add(pair6));
-		Assert.assertFalse(pairSet.add(pair1));
-
-		Assert.assertTrue(pairSet.containsAll(Arrays.asList(pairs)));
-	}
-
-	@Test
-	public void testPactMap() {
-		NfIntStringMap map0 = new NfIntStringMap();
-		map0.put(new IntValue(10), new StringValue("20"));
-
-		// test data retrieval
-		for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
-			Assert.assertEquals(entry.getValue(), new StringValue("20"));
-			Assert.assertEquals(entry.getKey(), new IntValue(10));
-		}
-
-		// test data overwriting
-		map0.put(new IntValue(10), new StringValue("10"));
-		for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
-			Assert.assertEquals(entry.getValue(), new StringValue("10"));
-			Assert.assertEquals(entry.getKey(), new IntValue(10));
-		}
-
-		// now test data retrieval of multiple values
-		map0.put(new IntValue(20), new StringValue("20"));
-		map0.put(new IntValue(30), new StringValue("30"));
-		map0.put(new IntValue(40), new StringValue("40"));
-
-		// construct an inverted map
-		NfStringIntMap mapinv = new NfStringIntMap();
-		for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
-			Assert.assertEquals(entry.getKey().getValue(), Integer.parseInt(entry.getValue().toString()));
-			mapinv.put(entry.getValue(), entry.getKey());
-		}
-
-		for (Entry<StringValue, IntValue> entry : mapinv.entrySet()) {
-			Assert.assertEquals(entry.getValue().getValue(), Integer.parseInt(entry.getKey().toString()));
-		}
-
-		// now test data transfer
-		NfIntStringMap nMap = new NfIntStringMap();
-		try {
-			map0.write(out);
-			nMap.read(in);
-		} catch (Exception e) {
-			Assert.fail();
-		}
-		for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
-			Assert.assertEquals(entry.getKey().getValue(), Integer.parseInt(entry.getValue().toString()));
-		}
-	}
-
-	@Test
-	public void testPactList() {
-		NfStringList list = new NfStringList();
-		list.add(new StringValue("Hello!"));
-
-		for (StringValue value : list) {
-			Assert.assertEquals(value, new StringValue("Hello!"));
-		}
-
-		list.add(new StringValue("Hello2!"));
-		list.add(new StringValue("Hello3!"));
-		list.add(new StringValue("Hello4!"));
-
-		// test data transfer
-		NfStringList mList2 = new NfStringList();
-		try {
-			list.write(out);
-			mList2.read(in);
-		}
-		catch (Exception e) {
-			Assert.fail();
-		}
-		Assert.assertTrue(list.equals(mList2));
-	}
-
-	private class NfIntStringPair extends Pair<IntValue, StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	private class NfDoubleStringPair extends Pair<DoubleValue, StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	private class NfStringList extends ListValue<StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	private class NfIntStringMap extends MapValue<IntValue, StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	private class NfStringIntMap extends MapValue<StringValue, IntValue> {
-		private static final long serialVersionUID = 1L;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
deleted file mode 100644
index 1fc4c34..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.types.Key;
-
-
-public class PostPassUtils {
-
-	public static <X> Class<? extends Key<?>>[] getKeys(AbstractSchema<Class< ? extends X>> schema, int[] fields) throws MissingFieldTypeInfoException {
-		@SuppressWarnings("unchecked")
-		Class<? extends Key<?>>[] keyTypes = new Class[fields.length];
-		
-		for (int i = 0; i < fields.length; i++) {
-			Class<? extends X> type = schema.getType(fields[i]);
-			if (type == null) {
-				throw new MissingFieldTypeInfoException(i);
-			} else if (Key.class.isAssignableFrom(type)) {
-				@SuppressWarnings("unchecked")
-				Class<? extends Key<?>> keyType = (Class<? extends Key<?>>) type;
-				keyTypes[i] = keyType;
-			} else {
-				throw new CompilerException("The field type " + type.getName() +
-						" cannot be used as a key because it does not implement the interface 'Key'");
-			}
-		}
-		
-		return keyTypes;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
deleted file mode 100644
index 1545d6f..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.types.Key;
-
-/**
- * Class encapsulating a schema map (int column position -&gt; column type) and a reference counter.
- */
-public class SparseKeySchema extends AbstractSchema<Class<? extends Key<?>>> {
-	
-	private final Map<Integer, Class<? extends Key<?>>> schema;
-	
-	
-	public SparseKeySchema() {
-		this.schema = new HashMap<Integer, Class<? extends Key<?>>>();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void addType(int key, Class<? extends Key<?>> type) throws ConflictingFieldTypeInfoException  {
-		Class<? extends Key<?>> previous = this.schema.put(key, type);
-		if (previous != null && previous != type) {
-			throw new ConflictingFieldTypeInfoException(key, previous, type);
-		}
-	}
-	
-	@Override
-	public Class<? extends Key<?>> getType(int field) {
-		return this.schema.get(field);
-	}
-	
-	@Override
-	public Iterator<Entry<Integer, Class<? extends Key<?>>>> iterator() {
-		return this.schema.entrySet().iterator();
-	}
-	
-	public int getNumTypes() {
-		return this.schema.size();
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return this.schema.hashCode() ^ getNumConnectionsThatContributed();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof SparseKeySchema) {
-			SparseKeySchema other = (SparseKeySchema) obj;
-			return this.schema.equals(other.schema) && 
-					this.getNumConnectionsThatContributed() == other.getNumConnectionsThatContributed();
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return "<" + getNumConnectionsThatContributed() + "> : " + this.schema.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
index 74126f8..483bc51 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
@@ -21,7 +21,6 @@ package org.apache.flink.optimizer.dataproperties;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
 
 import java.io.IOException;
 
@@ -29,8 +28,8 @@ import java.io.IOException;
 public class MockDistribution implements DataDistribution {
 
 	@Override
-	public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-		return new Key<?>[0];
+	public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+		return new Object[0];
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index 9ccb899..c3c898d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -33,8 +33,8 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.Value;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -48,11 +48,11 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	private final List<Record> outList = new ArrayList<Record>();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index a4e4fd5..21e7dc4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -27,7 +28,6 @@ import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactor
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -38,11 +38,11 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Re
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	private final CountingOutputCollector output = new CountingOutputCollector();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index bf7d467..f178e6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -44,11 +44,11 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	private final CountingOutputCollector output = new CountingOutputCollector();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 1699f79..f5a61a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.junit.Test;
 
@@ -47,7 +47,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ IntValue.class });
 
 	public CombineTaskExternalITCase(ExecutionConfig config) {
 		super(config, COMBINE_MEM, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
index 5dc3772..21f6ac2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
@@ -20,6 +20,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
@@ -27,7 +28,6 @@ import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactor
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -46,11 +46,11 @@ public class JoinTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Reco
 
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	private final CountingOutputCollector output = new CountingOutputCollector();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
index 4ce4fd1..753863c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
@@ -34,8 +34,8 @@ import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -59,11 +59,11 @@ public class JoinTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ IntValue.class });
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ IntValue.class });
 	
 	private final List<Record> outList = new ArrayList<>();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index 6ebafee..1d7fdb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.operators;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +36,6 @@ import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -45,7 +46,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	private final List<Record> outList = new ArrayList<>();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 904b81e..e05f7d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +41,6 @@ import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -51,7 +51,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class });
 	
 	private final List<Record> outList = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index d9d6a25..dd6b4c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -48,9 +48,9 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
 import org.junit.Assert;
@@ -81,7 +81,7 @@ public class HashTableITCase {
 	{
 		final int[] keyPos = new int[] {0};
 		@SuppressWarnings("unchecked")
-		final Class<? extends Key<?>>[] keyType = (Class<? extends Key<?>>[]) new Class[] { IntValue.class };
+		final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[] { IntValue.class };
 		
 		this.recordBuildSideAccesssor = RecordSerializer.get();
 		this.recordProbeSideAccesssor = RecordSerializer.get();


Mime
View raw message