flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [7/7] flink git commit: [FLINK-3169] Move Record Type Utils from flink-java to flink-runtime/test
Date Mon, 14 Dec 2015 18:48:36 GMT
[FLINK-3169] Move Record Type Utils from flink-java to flink-runtime/test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/066913e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/066913e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/066913e2

Branch: refs/heads/master
Commit: 066913e2360db3c184e12f0104c87b91824b449b
Parents: 4a26cce
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Dec 13 19:50:37 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Dec 14 17:48:42 2015 +0100

----------------------------------------------------------------------
 .../typeutils/record/RecordComparator.java      | 423 -------------------
 .../record/RecordComparatorFactory.java         | 152 -------
 .../typeutils/record/RecordPairComparator.java  | 106 -----
 .../record/RecordPairComparatorFactory.java     | 105 -----
 .../typeutils/record/RecordSerializer.java      | 144 -------
 .../record/RecordSerializerFactory.java         |  75 ----
 .../api/java/typeutils/RecordTypeInfo.java      |  95 -----
 .../api/java/typeutils/RecordTypeInfoTest.java  |  44 --
 .../runtime/operators/CachedMatchTaskTest.java  |   4 +-
 .../operators/CoGroupTaskExternalITCase.java    |   4 +-
 .../runtime/operators/CoGroupTaskTest.java      |   4 +-
 .../operators/CombineTaskExternalITCase.java    |   2 +-
 .../runtime/operators/DataSinkTaskTest.java     |   2 +-
 .../operators/JoinTaskExternalITCase.java       |   4 +-
 .../flink/runtime/operators/JoinTaskTest.java   |   4 +-
 .../operators/LeftOuterJoinTaskTest.java        |   6 -
 .../operators/ReduceTaskExternalITCase.java     |   4 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   4 +-
 .../operators/chaining/ChainTaskTest.java       |   4 +-
 .../runtime/operators/hash/HashTableITCase.java |   4 +-
 ...lockResettableMutableObjectIteratorTest.java |   2 +-
 .../NonReusingBlockResettableIteratorTest.java  |   2 +-
 .../ReusingBlockResettableIteratorTest.java     |   2 +-
 ...lingResettableMutableObjectIteratorTest.java |   2 +-
 .../operators/testutils/DriverTestBase.java     |   4 +-
 .../operators/testutils/TaskTestBase.java       |   2 +-
 .../operators/util/OutputEmitterTest.java       |   4 +-
 .../testutils/recordutils/RecordComparator.java | 423 +++++++++++++++++++
 .../recordutils/RecordComparatorFactory.java    | 152 +++++++
 .../recordutils/RecordPairComparator.java       | 106 +++++
 .../RecordPairComparatorFactory.java            | 105 +++++
 .../testutils/recordutils/RecordSerializer.java | 144 +++++++
 .../recordutils/RecordSerializerFactory.java    |  75 ++++
 .../util/NonReusingKeyGroupedIteratorTest.java  |   2 +-
 .../util/ReusingKeyGroupedIteratorTest.java     |   4 +-
 35 files changed, 1037 insertions(+), 1182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordComparator.java
deleted file mode 100644
index 605d6a1..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordComparator.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.typeutils.record;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-
-
-/**
- * Implementation of the {@link TypeComparator} interface for the pact record. Instances of this class
- * are parameterized with which fields are relevant to the comparison. 
- */
-public final class RecordComparator extends TypeComparator<Record> {
-	
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * A sequence of prime numbers to be used for salting the computed hash values.
-	 * Based on some empirical evidence, we are using a 32-element subsequence of the  
-	 * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
-	 * 
-	 * @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
-	 * @see: http://oeis.org/A068652
-	 */
-	private static final int[] HASH_SALT = new int[] { 
-		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
-		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
-		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
-		19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
-	
-	private final int[] keyFields;
-	
-	@SuppressWarnings("rawtypes")
-	private final Key[] keyHolders, transientKeyHolders;
-	
-	private final Record temp1, temp2;
-	
-	private final boolean[] ascending;
-	
-	private final int[] normalizedKeyLengths;
-	
-	private final int numLeadingNormalizableKeys;
-	
-	private final int normalizableKeyPrefixLen;
-	
-
-	/**
-	 * Creates a new comparator that compares Pact Records by the subset of fields as described
-	 * by the given key positions and types. All order comparisons will assume ascending order on all fields.
-	 * 
-	 * @param keyFields The positions of the key fields.
-	 * @param keyTypes The types (classes) of the key fields.
-	 */
-	public RecordComparator(int[] keyFields, Class<? extends Key<?>>[] keyTypes) {
-		this(keyFields, keyTypes, null);
-	}
-	
-	/**
-	 * Creates a new comparator that compares Pact Records by the subset of fields as described
-	 * by the given key positions and types.
-	 * 
-	 * @param keyFields The positions of the key fields.
-	 * @param keyTypes The types (classes) of the key fields.
-	 * @param sortDirection The direction for sorting. A value of <i>true</i> indicates ascending for an attribute,
-	 *                  a value of <i>false</i> indicated descending. If the parameter is <i>null</i>, then
-	 *                  all order comparisons will assume ascending order on all fields.
-	 */
-	public RecordComparator(int[] keyFields, Class<? extends Key<?>>[] keyTypes, boolean[] sortDirection) {
-		this.keyFields = keyFields;
-		
-		// instantiate fields to extract keys into
-		this.keyHolders = new Key[keyTypes.length];
-		this.transientKeyHolders = new Key[keyTypes.length];
-		for (int i = 0; i < keyTypes.length; i++) {
-			if (keyTypes[i] == null) {
-				throw new NullPointerException("Key type " + i + " is null.");
-			}
-			this.keyHolders[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
-			this.transientKeyHolders[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
-		}
-		
-		// set up auxiliary fields for normalized key support
-		this.normalizedKeyLengths = new int[keyFields.length];
-		int nKeys = 0;
-		int nKeyLen = 0;
-		boolean inverted = false;
-		for (int i = 0; i < this.keyHolders.length; i++) {
-			Key<?> k = this.keyHolders[i];
-			if (k instanceof NormalizableKey) {
-				if (sortDirection != null) {
-					if (sortDirection[i] && inverted) {
-						break;
-					} else if (i == 0 && !sortDirection[0]) {
-						inverted = true;
-					}
-				}
-				nKeys++;
-				final int len = ((NormalizableKey<?>) k).getMaxNormalizedKeyLen();
-				if (len < 0) {
-					throw new RuntimeException("Data type " + k.getClass().getName() + 
-						" specifies an invalid length for the normalized key: " + len);
-				}
-				this.normalizedKeyLengths[i] = len;
-				nKeyLen += this.normalizedKeyLengths[i];
-				if (nKeyLen < 0) {
-					nKeyLen = Integer.MAX_VALUE;
-					break;
-				}
-			} else {
-				break;
-			}
-		}
-		this.numLeadingNormalizableKeys = nKeys;
-		this.normalizableKeyPrefixLen = nKeyLen;
-		
-		this.temp1 = new Record();
-		this.temp2 = new Record();
-		
-		if (sortDirection != null) {
-			this.ascending = sortDirection;
-		} else {
-			this.ascending = new boolean[keyFields.length];
-			for (int i = 0; i < this.ascending.length; i++) {
-				this.ascending[i] = true;
-			}
-		}
-	}
-	
-	/**
-	 * Copy constructor.
-	 * 
-	 * @param toCopy Comparator to copy.
-	 */
-	private RecordComparator(RecordComparator toCopy) {
-		this.keyFields = toCopy.keyFields;
-		this.keyHolders = new Key[toCopy.keyHolders.length];
-		this.transientKeyHolders = new Key[toCopy.keyHolders.length];
-		
-		try {
-			for (int i = 0; i < this.keyHolders.length; i++) {
-				this.keyHolders[i] = toCopy.keyHolders[i].getClass().newInstance();
-				this.transientKeyHolders[i] = toCopy.keyHolders[i].getClass().newInstance();
-			}
-		} catch (Exception ex) {
-			// this should never happen, because the classes have been instantiated before. Report for debugging.
-			throw new RuntimeException("Could not instantiate key classes when duplicating RecordComparator.", ex);
-		}
-		
-		this.normalizedKeyLengths = toCopy.normalizedKeyLengths;
-		this.numLeadingNormalizableKeys = toCopy.numLeadingNormalizableKeys;
-		this.normalizableKeyPrefixLen = toCopy.normalizableKeyPrefixLen;
-		this.ascending = toCopy.ascending;
-		
-		this.temp1 = new Record();
-		this.temp2 = new Record();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public int hash(Record object) {
-		int i = 0;
-		try {
-			int code = 0;
-			for (; i < this.keyFields.length; i++) {
-				code ^= object.getField(this.keyFields[i], this.transientKeyHolders[i]).hashCode();
-				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
-			}
-			return code;
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i]);
-		}
-		catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(this.keyFields[i]);
-		}
-	}
-
-
-	@Override
-	public void setReference(Record toCompare) {
-		for (int i = 0; i < this.keyFields.length; i++) {
-			if (!toCompare.getFieldInto(this.keyFields[i], this.keyHolders[i])) {
-				throw new NullKeyFieldException(this.keyFields[i]);
-			}
-		}
-	}
-
-
-	@Override
-	public boolean equalToReference(Record candidate) {
-		for (int i = 0; i < this.keyFields.length; i++) {
-			final Key<?> k = candidate.getField(this.keyFields[i], this.transientKeyHolders[i]);
-			if (k == null) {
-				throw new NullKeyFieldException(this.keyFields[i]);
-			} else if (!k.equals(this.keyHolders[i])) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-
-	@Override
-	public int compareToReference(TypeComparator<Record> referencedAccessors) {
-		final RecordComparator pra = (RecordComparator) referencedAccessors;
-		
-		for (int i = 0; i < this.keyFields.length; i++) {
-			@SuppressWarnings("unchecked")
-			final int comp = pra.keyHolders[i].compareTo(this.keyHolders[i]);
-			if (comp != 0) {
-				return this.ascending[i] ? comp : -comp;
-			}
-		}
-		return 0;
-	}
-	
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public int compare(Record first, Record second) {
-		int i = 0;
-		try {
-			for (; i < this.keyFields.length; i++) {
-				Key k1 = first.getField(this.keyFields[i], this.keyHolders[i]);
-				Key k2 = second.getField(this.keyFields[i], this.transientKeyHolders[i]);
-				int cmp = k1.compareTo(k2);
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-			return 0;
-		}
-		catch (NullPointerException e) {
-			throw new NullKeyFieldException(this.keyFields[i]);
-		}
-	}
-	
-	@Override
-	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
-		this.temp1.read(source1);
-		this.temp2.read(source2);
-		
-		for (int i = 0; i < this.keyFields.length; i++) {
-			@SuppressWarnings("rawtypes")
-			final Key k1 = this.temp1.getField(this.keyFields[i], this.keyHolders[i]);
-			@SuppressWarnings("rawtypes")
-			final Key k2 = this.temp2.getField(this.keyFields[i], this.transientKeyHolders[i]);
-			
-			if (k1 == null || k2 == null) {
-				throw new NullKeyFieldException(this.keyFields[i]);
-			}
-			
-			@SuppressWarnings("unchecked")
-			final int comp = k1.compareTo(k2);
-			if (comp != 0) {
-				return this.ascending[i] ? comp : -comp;
-			}
-		}
-		return 0;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return this.numLeadingNormalizableKeys > 0;
-	}
-
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return this.normalizableKeyPrefixLen;
-	}
-	
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return this.numLeadingNormalizableKeys < this.keyFields.length ||
-				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
-				this.normalizableKeyPrefixLen > keyBytes;
-	}
-
-	@Override
-	public void putNormalizedKey(Record record, MemorySegment target, int offset, int numBytes) {
-		int i = 0;
-		try {
-			for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
-			{
-				int len = this.normalizedKeyLengths[i]; 
-				len = numBytes >= len ? len : numBytes;
-				((NormalizableKey<?>) record.getField(this.keyFields[i], this.transientKeyHolders[i])).copyNormalizedKey(target, offset, len);
-				numBytes -= len;
-				offset += len;
-			}
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i]);
-		}
-	}
-	
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return !this.ascending[0];
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(Record record, DataOutputView target) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Record readWithKeyDenormalization(Record reuse, DataInputView source) {
-		throw new UnsupportedOperationException();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public RecordComparator duplicate() {
-		return new RecordComparator(this);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                           Non Standard Comparator Methods
-	// --------------------------------------------------------------------------------------------
-	
-	public final int[] getKeyPositions() {
-		return this.keyFields;
-	}
-	
-	@SuppressWarnings("unchecked")
-	public final Class<? extends Key<?>>[] getKeyTypes() {
-		final Class<? extends Key<?>>[] keyTypes = new Class[this.keyHolders.length];
-		for (int i = 0; i < keyTypes.length; i++) {
-			keyTypes[i] = (Class<? extends Key<?>>) this.keyHolders[i].getClass();
-		}
-		return keyTypes;
-	}
-	
-	public final Key<?>[] getKeysAsCopy(Record record) {
-		try {
-			final Key<?>[] keys = new Key[this.keyFields.length];
-			for (int i = 0; i < keys.length; i++) {
-				keys[i] = this.keyHolders[i].getClass().newInstance();
-			}
-			if(!record.getFieldsInto(this.keyFields, keys)) {
-				throw new RuntimeException("Could not extract keys from record.");
-			}
-			return keys;
-		} catch (Exception ex) {
-			// this should never happen, because the classes have been instantiated before. Report for debugging.
-			throw new RuntimeException("Could not instantiate key classes when duplicating RecordComparator.", ex);
-		}
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		throw new UnsupportedOperationException("Record does not support extactKeys and " +
-				"getComparators. This cannot be used with the GenericPairComparator.");
-	}
-
-
-	@Override
-	public TypeComparator<?>[] getFlatComparators() {
-		throw new UnsupportedOperationException("Record does not support extactKeys and " +
-				"getComparators. This cannot be used with the GenericPairComparator.");
-	}
-
-	@Override
-	public boolean supportsCompareAgainstReference() {
-		return true;
-	}
-
-	@Override
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public final int compareAgainstReference(Comparable[] keys) {
-		for (int i = 0; i < this.keyFields.length; i++)
-		{
-			final int comp = keys[i].compareTo(this.keyHolders[i]);
-			if (comp != 0) {
-				return this.ascending[i] ? comp : -comp;
-			}
-		}
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordComparatorFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordComparatorFactory.java
deleted file mode 100644
index 7de758b..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordComparatorFactory.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.typeutils.record;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-/**
- * A factory for a {@link org.apache.flink.api.common.typeutils.TypeComparator} for {@link Record}. The comparator uses a subset of
- * the fields for the comparison. That subset of fields (positions and types) is read from the
- * supplied configuration.
- */
-public class RecordComparatorFactory implements TypeComparatorFactory<Record> {
-	
-	private static final String NUM_KEYS = "numkeys";
-	
-	private static final String KEY_POS_PREFIX = "keypos.";
-	
-	private static final String KEY_CLASS_PREFIX = "keyclass.";
-	
-	private static final String KEY_SORT_DIRECTION_PREFIX = "key-direction.";
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private int[] positions;
-	
-	private Class<? extends Key<?>>[] types;
-	
-	private boolean[] sortDirections;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public RecordComparatorFactory() {
-		// do nothing, allow to be configured via config
-	}
-	
-	public RecordComparatorFactory(int[] positions, Class<? extends Key<?>>[] types) {
-		this(positions, types, null);
-	}
-	
-	public RecordComparatorFactory(int[] positions, Class<? extends Key<?>>[] types, boolean[] sortDirections) {
-		if (positions == null || types == null) {
-			throw new NullPointerException();
-		}
-		if (positions.length != types.length) {
-			throw new IllegalArgumentException();
-		}
-		
-		this.positions = positions;
-		this.types = types;
-		
-		if (sortDirections == null) {
-			this.sortDirections = new boolean[positions.length];
-			Arrays.fill(this.sortDirections, true);
-		} else if (sortDirections.length != positions.length) {
-			throw new IllegalArgumentException();
-		} else {
-			this.sortDirections = sortDirections;
-		}
-	}
-
-
-	@Override
-	public void writeParametersToConfig(Configuration config) {
-		for (int i = 0; i < this.positions.length; i++) {
-			if (this.positions[i] < 0) {
-				throw new IllegalArgumentException("The key position " + i + " is invalid: " + this.positions[i]);
-			}
-			if (this.types[i] == null || !Key.class.isAssignableFrom(this.types[i])) {
-				throw new IllegalArgumentException("The key type " + i + " is null or not implenting the interface " + 
-					Key.class.getName() + ".");
-			}
-		}
-		
-		// write the config
-		config.setInteger(NUM_KEYS, this.positions.length);
-		for (int i = 0; i < this.positions.length; i++) {
-			config.setInteger(KEY_POS_PREFIX + i, this.positions[i]);
-			config.setString(KEY_CLASS_PREFIX + i, this.types[i].getName());
-			config.setBoolean(KEY_SORT_DIRECTION_PREFIX + i, this.sortDirections[i]);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
-		// figure out how many key fields there are
-		final int numKeyFields = config.getInteger(NUM_KEYS, -1);
-		if (numKeyFields < 0) {
-			throw new IllegalConfigurationException("The number of keys for the comparator is invalid: " + numKeyFields);
-		}
-		
-		final int[] positions = new int[numKeyFields];
-		final Class<? extends Key<?>>[] types = new Class[numKeyFields];
-		final boolean[] direction = new boolean[numKeyFields];
-		
-		// read the individual key positions and types
-		for (int i = 0; i < numKeyFields; i++) {
-			// next key position
-			final int p = config.getInteger(KEY_POS_PREFIX + i, -1);
-			if (p >= 0) {
-				positions[i] = p;
-			} else {
-				throw new IllegalConfigurationException("Contained invalid position for key no positions for keys.");
-			}
-			
-			// next key type
-			final String name = config.getString(KEY_CLASS_PREFIX + i, null);
-			if (name != null) {
-				types[i] = (Class<? extends Key<?>>) Class.forName(name, true, cl).asSubclass(Key.class);
-			} else {
-				throw new IllegalConfigurationException("The key type (" + i +
-					") for the comparator is null"); 
-			}
-			
-			// next key sort direction
-			direction[i] = config.getBoolean(KEY_SORT_DIRECTION_PREFIX + i, true);
-		}
-		
-		this.positions = positions;
-		this.types = types;
-		this.sortDirections = direction;
-	}
-	
-
-	@Override
-	public RecordComparator createComparator() {
-		return new RecordComparator(this.positions, this.types, this.sortDirections);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordPairComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordPairComparator.java
deleted file mode 100644
index 7b95e55..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordPairComparator.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.typeutils.record;
-
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-
-
-/**
- * Implementation of the {@link TypePairComparator} interface for Pact Records. The equality is established on a set of
- * key fields. The indices of the key fields may be different on the reference and candidate side.
- */
-public class RecordPairComparator extends TypePairComparator<Record, Record>  {
-	
-	private final int[] keyFields1, keyFields2;			// arrays with the positions of the keys in the records
-	
-	@SuppressWarnings("rawtypes")
-	private final Key[] keyHolders1, keyHolders2;		// arrays with mutable objects for the key types
-	
-	
-	public RecordPairComparator(int[] keyFieldsReference, int[] keyFieldsCandidate, Class<? extends Key<?>>[] keyTypes) {
-		if (keyFieldsReference.length != keyFieldsCandidate.length || keyFieldsCandidate.length != keyTypes.length) {
-			throw new IllegalArgumentException(
-				"The arrays describing the key positions and types must be of the same length.");
-		}
-		this.keyFields1 = keyFieldsReference;
-		this.keyFields2 = keyFieldsCandidate;
-		
-		// instantiate fields to extract keys into
-		this.keyHolders1 = new Key[keyTypes.length];
-		this.keyHolders2 = new Key[keyTypes.length];
-		
-		for (int i = 0; i < keyTypes.length; i++) {
-			if (keyTypes[i] == null) {
-				throw new NullPointerException("Key type " + i + " is null.");
-			}
-			this.keyHolders1[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
-			this.keyHolders2[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-
-	@Override
-	public void setReference(Record reference) {
-		for (int i = 0; i < this.keyFields1.length; i++) {
-			if (!reference.getFieldInto(this.keyFields1[i], this.keyHolders1[i])) {
-				throw new NullKeyFieldException(this.keyFields1[i]);
-			}
-		}
-	}
-
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public boolean equalToReference(Record candidate) {
-		for (int i = 0; i < this.keyFields2.length; i++) {
-			final Key k = candidate.getField(this.keyFields2[i], this.keyHolders2[i]);
-			if (k == null) {
-				throw new NullKeyFieldException(this.keyFields2[i]);
-			} else if (!k.equals(this.keyHolders1[i])) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public int compareToReference(Record candidate) {
-		for (int i = 0; i < this.keyFields2.length; i++) {
-			final Key k = candidate.getField(this.keyFields2[i], this.keyHolders2[i]);
-			if (k == null) {
-				throw new NullKeyFieldException(this.keyFields2[i]);
-			} else {
-				final int comp = k.compareTo(this.keyHolders1[i]);
-				if (comp != 0) {
-					return comp;
-				}
-			}
-		}
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordPairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordPairComparatorFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordPairComparatorFactory.java
deleted file mode 100644
index 752540e..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordPairComparatorFactory.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.typeutils.record;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-/**
- * A factory for a {@link TypePairComparator} for {@link Record}. The comparator uses a subset of
- * the fields for the comparison. That subset of fields (positions and types) is read from the
- * supplied configuration.
- */
-public class RecordPairComparatorFactory implements TypePairComparatorFactory<Record, Record> {
-	
-	private static final RecordPairComparatorFactory INSTANCE = new RecordPairComparatorFactory();
-	
-	/**
-	 * Gets an instance of the comparator factory. The instance is shared, since the factory is a
-	 * stateless class. 
-	 * 
-	 * @return An instance of the comparator factory.
-	 */
-	public static final RecordPairComparatorFactory get() {
-		return INSTANCE;
-	}
-
-	@Override
-	public TypePairComparator<Record, Record> createComparator12(
-			TypeComparator<Record> comparator1,	TypeComparator<Record> comparator2)
-	{
-		if (!(comparator1 instanceof RecordComparator && comparator2 instanceof RecordComparator)) {
-			throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparators.");
-		}
-		final RecordComparator prc1 = (RecordComparator) comparator1;
-		final RecordComparator prc2 = (RecordComparator) comparator2;
-		
-		final int[] pos1 = prc1.getKeyPositions();
-		final int[] pos2 = prc2.getKeyPositions();
-		
-		final Class<? extends Key<?>>[] types1 = prc1.getKeyTypes();
-		final Class<? extends Key<?>>[] types2 = prc2.getKeyTypes();
-		
-		checkComparators(pos1, pos2, types1, types2);
-		
-		return new RecordPairComparator(pos1, pos2, types1);
-	}
-
-	@Override
-	public TypePairComparator<Record, Record> createComparator21(
-		TypeComparator<Record> comparator1,	TypeComparator<Record> comparator2)
-	{
-		if (!(comparator1 instanceof RecordComparator && comparator2 instanceof RecordComparator)) {
-			throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparators.");
-		}
-		final RecordComparator prc1 = (RecordComparator) comparator1;
-		final RecordComparator prc2 = (RecordComparator) comparator2;
-		
-		final int[] pos1 = prc1.getKeyPositions();
-		final int[] pos2 = prc2.getKeyPositions();
-		
-		final Class<? extends Key<?>>[] types1 = prc1.getKeyTypes();
-		final Class<? extends Key<?>>[] types2 = prc2.getKeyTypes();
-		
-		checkComparators(pos1, pos2, types1, types2);
-		
-		return new RecordPairComparator(pos2, pos1, types1);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	private static final void checkComparators(int[] pos1, int[] pos2, 
-							Class<? extends Key<?>>[] types1, Class<? extends Key<?>>[] types2)
-	{
-		if (pos1.length != pos2.length || types1.length != types2.length) {
-			throw new IllegalArgumentException(
-				"The given pair of RecordComparators does not operate on the same number of fields.");
-		}
-		for (int i = 0; i < types1.length; i++) {
-			if (!types1[i].equals(types2[i])) {
-				throw new IllegalArgumentException(
-				"The given pair of RecordComparators does not operates on different data types.");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
deleted file mode 100644
index 6ffa0df..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.typeutils.record;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Record;
-
-
-/**
- * Implementation of the (de)serialization and copying logic for the {@link Record}.
- */
-public final class RecordSerializer extends TypeSerializer<Record> {
-	
-	private static final long serialVersionUID = 1L;
-
-	private static final RecordSerializer INSTANCE = new RecordSerializer(); // singleton instance
-	
-	private static final int MAX_BIT = 0x80;	// byte where only the most significant bit is set
-	
-	// --------------------------------------------------------------------------------------------
-
-	public static RecordSerializer get() {
-		return INSTANCE;
-	}
-	
-	/**
-	 * Creates a new instance of the RecordSerializers. Private to prevent instantiation.
-	 */
-	private RecordSerializer() {}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public RecordSerializer duplicate() {
-		// does not hold state, so just return ourselves
-		return this;
-	}
-	
-	@Override
-	public Record createInstance() {
-		return new Record(); 
-	}
-
-	@Override
-	public Record copy(Record from) {
-		return from.createCopy();
-	}
-	
-	@Override
-	public Record copy(Record from, Record reuse) {
-		from.copyTo(reuse);
-		return reuse;
-	}
-	
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void serialize(Record record, DataOutputView target) throws IOException {
-		record.serialize(target);
-	}
-
-	@Override
-	public Record deserialize(DataInputView source) throws IOException {
-		return deserialize(new Record(), source);
-	}
-	
-	@Override
-	public Record deserialize(Record target, DataInputView source) throws IOException {
-		target.deserialize(source);
-		return target;
-	}
-	
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		int val = source.readUnsignedByte();
-		target.writeByte(val);
-		
-		if (val >= MAX_BIT) {
-			int shift = 7;
-			int curr;
-			val = val & 0x7f;
-			while ((curr = source.readUnsignedByte()) >= MAX_BIT) {
-				target.writeByte(curr);
-				val |= (curr & 0x7f) << shift;
-				shift += 7;
-			}
-			target.writeByte(curr);
-			val |= curr << shift;
-		}
-		
-		target.write(source, val);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof RecordSerializer) {
-			RecordSerializer other = (RecordSerializer) obj;
-			return other.canEqual(this);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof RecordSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return RecordSerializer.class.hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializerFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializerFactory.java
deleted file mode 100644
index 306b063..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializerFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.typeutils.record;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-
-/**
- * A factory that create a serializer for the {@link Record} data type.
- */
-public class RecordSerializerFactory implements TypeSerializerFactory<Record> {
-	
-	private static final RecordSerializerFactory INSTANCE = new RecordSerializerFactory();
-	
-	/**
-	 * Gets an instance of the serializer factory. The instance is shared, since the factory is a
-	 * stateless class. 
-	 * 
-	 * @return An instance of the serializer factory.
-	 */
-	public static final RecordSerializerFactory get() {
-		return INSTANCE;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void writeParametersToConfig(Configuration config) {}
-
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) {}
-	
-
-	@Override
-	public TypeSerializer<Record> getSerializer() {
-		return RecordSerializer.get();
-	}
-
-	@Override
-	public Class<Record> getDataType() {
-		return Record.class;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 31;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == RecordSerializerFactory.class;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
deleted file mode 100644
index e9ce102..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.types.Record;
-
-/**
- * Type information for the {@link Record} data type.
- */
-public class RecordTypeInfo extends TypeInformation<Record> {
-
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-	
-	@Override
-	public int getTotalFields() {
-		return 1;
-	}
-
-	@Override
-	public Class<Record> getTypeClass() {
-		return Record.class;
-	}
-	
-	@Override
-	public boolean isKeyType() {
-		return false;
-	}
-
-	@Override
-	public TypeSerializer<Record> createSerializer(ExecutionConfig config) {
-		return RecordSerializer.get();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return Record.class.hashCode();
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof RecordTypeInfo;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof RecordTypeInfo) {
-			RecordTypeInfo recordTypeInfo = (RecordTypeInfo) obj;
-			return recordTypeInfo.canEqual(this);
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return "RecordType";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
deleted file mode 100644
index 7aeb062..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class RecordTypeInfoTest extends TestLogger {
-
-	@Test
-	public void testRecordTypeInfoEquality() {
-		RecordTypeInfo tpeInfo1 = new RecordTypeInfo();
-		RecordTypeInfo tpeInfo2 = new RecordTypeInfo();
-
-		assertEquals(tpeInfo1, tpeInfo2);
-		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
-	}
-
-	@Test
-	public void testRecordTypeInfoInequality() {
-		RecordTypeInfo tpeInfo1 = new RecordTypeInfo();
-		MissingTypeInfo tpeInfo2 = new MissingTypeInfo("foobar");
-
-		assertNotEquals(tpeInfo1, tpeInfo2);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index c93c302..9ccb899 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -25,8 +25,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index 7f96954..a4e4fd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index 9c0f075..bf7d467 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
 import org.apache.flink.runtime.operators.CoGroupTaskExternalITCase.MockCoGroupStub;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 800bca7..e162d7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 6221706..eb2c8a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
index 5b2e6eb..5dc3772 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
@@ -22,8 +22,8 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
index ecde59e..4ce4fd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
@@ -25,8 +25,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java
index ad11768..266723a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java
@@ -21,22 +21,16 @@ package org.apache.flink.runtime.operators;
 
 import com.google.common.base.Throwables;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
-import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DelayingIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
-import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.Record;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertFalse;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index 415b6bc..a00aea3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -27,8 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 8bc7fe5..531d8ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -28,8 +28,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 542812c..4d8e0de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index d0a6fc6..4afa114 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -30,8 +30,8 @@ import java.util.Map;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIteratorTest.java
index bfb7cf2..00011ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIteratorTest.java
@@ -23,7 +23,7 @@ import java.util.List;
 
 import org.junit.Assert;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializer;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
index d5de75e..01dbe18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.junit.Assert;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializer;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIteratorTest.java
index 7dbd2fb..ea4b667 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIteratorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.operators.resettable;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializer;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index c64db54..ef48a1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.operators.resettable;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index c442940..ab58cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -32,8 +32,8 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 63f54ea..458f0a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
index 6bf5dcc..4623646 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.junit.Assert;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparator.java
new file mode 100644
index 0000000..41810bf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparator.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.testutils.recordutils;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.InstantiationUtil;
+
+
+/**
+ * Implementation of the {@link TypeComparator} interface for the pact record. Instances of this class
+ * are parameterized with which fields are relevant to the comparison. 
+ */
+public final class RecordComparator extends TypeComparator<Record> {
+	
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * A sequence of prime numbers to be used for salting the computed hash values.
+	 * Based on some empirical evidence, we are using a 32-element subsequence of the  
+	 * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
+	 * 
+	 * @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
+	 * @see: http://oeis.org/A068652
+	 */
+	private static final int[] HASH_SALT = new int[] { 
+		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
+		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
+		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
+		19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
+	
+	private final int[] keyFields;
+	
+	@SuppressWarnings("rawtypes")
+	private final Key[] keyHolders, transientKeyHolders;
+	
+	private final Record temp1, temp2;
+	
+	private final boolean[] ascending;
+	
+	private final int[] normalizedKeyLengths;
+	
+	private final int numLeadingNormalizableKeys;
+	
+	private final int normalizableKeyPrefixLen;
+	
+
+	/**
+	 * Creates a new comparator that compares Pact Records by the subset of fields as described
+	 * by the given key positions and types. All order comparisons will assume ascending order on all fields.
+	 * 
+	 * @param keyFields The positions of the key fields.
+	 * @param keyTypes The types (classes) of the key fields.
+	 */
+	public RecordComparator(int[] keyFields, Class<? extends Key<?>>[] keyTypes) {
+		this(keyFields, keyTypes, null);
+	}
+	
+	/**
+	 * Creates a new comparator that compares Pact Records by the subset of fields as described
+	 * by the given key positions and types.
+	 * 
+	 * @param keyFields The positions of the key fields.
+	 * @param keyTypes The types (classes) of the key fields.
+	 * @param sortDirection The direction for sorting. A value of <i>true</i> indicates ascending for an attribute,
+	 *                  a value of <i>false</i> indicated descending. If the parameter is <i>null</i>, then
+	 *                  all order comparisons will assume ascending order on all fields.
+	 */
+	public RecordComparator(int[] keyFields, Class<? extends Key<?>>[] keyTypes, boolean[] sortDirection) {
+		this.keyFields = keyFields;
+		
+		// instantiate fields to extract keys into
+		this.keyHolders = new Key[keyTypes.length];
+		this.transientKeyHolders = new Key[keyTypes.length];
+		for (int i = 0; i < keyTypes.length; i++) {
+			if (keyTypes[i] == null) {
+				throw new NullPointerException("Key type " + i + " is null.");
+			}
+			this.keyHolders[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
+			this.transientKeyHolders[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
+		}
+		
+		// set up auxiliary fields for normalized key support
+		this.normalizedKeyLengths = new int[keyFields.length];
+		int nKeys = 0;
+		int nKeyLen = 0;
+		boolean inverted = false;
+		for (int i = 0; i < this.keyHolders.length; i++) {
+			Key<?> k = this.keyHolders[i];
+			if (k instanceof NormalizableKey) {
+				if (sortDirection != null) {
+					if (sortDirection[i] && inverted) {
+						break;
+					} else if (i == 0 && !sortDirection[0]) {
+						inverted = true;
+					}
+				}
+				nKeys++;
+				final int len = ((NormalizableKey<?>) k).getMaxNormalizedKeyLen();
+				if (len < 0) {
+					throw new RuntimeException("Data type " + k.getClass().getName() + 
+						" specifies an invalid length for the normalized key: " + len);
+				}
+				this.normalizedKeyLengths[i] = len;
+				nKeyLen += this.normalizedKeyLengths[i];
+				if (nKeyLen < 0) {
+					nKeyLen = Integer.MAX_VALUE;
+					break;
+				}
+			} else {
+				break;
+			}
+		}
+		this.numLeadingNormalizableKeys = nKeys;
+		this.normalizableKeyPrefixLen = nKeyLen;
+		
+		this.temp1 = new Record();
+		this.temp2 = new Record();
+		
+		if (sortDirection != null) {
+			this.ascending = sortDirection;
+		} else {
+			this.ascending = new boolean[keyFields.length];
+			for (int i = 0; i < this.ascending.length; i++) {
+				this.ascending[i] = true;
+			}
+		}
+	}
+	
+	/**
+	 * Copy constructor.
+	 * 
+	 * @param toCopy Comparator to copy.
+	 */
+	private RecordComparator(RecordComparator toCopy) {
+		this.keyFields = toCopy.keyFields;
+		this.keyHolders = new Key[toCopy.keyHolders.length];
+		this.transientKeyHolders = new Key[toCopy.keyHolders.length];
+		
+		try {
+			for (int i = 0; i < this.keyHolders.length; i++) {
+				this.keyHolders[i] = toCopy.keyHolders[i].getClass().newInstance();
+				this.transientKeyHolders[i] = toCopy.keyHolders[i].getClass().newInstance();
+			}
+		} catch (Exception ex) {
+			// this should never happen, because the classes have been instantiated before. Report for debugging.
+			throw new RuntimeException("Could not instantiate key classes when duplicating RecordComparator.", ex);
+		}
+		
+		this.normalizedKeyLengths = toCopy.normalizedKeyLengths;
+		this.numLeadingNormalizableKeys = toCopy.numLeadingNormalizableKeys;
+		this.normalizableKeyPrefixLen = toCopy.normalizableKeyPrefixLen;
+		this.ascending = toCopy.ascending;
+		
+		this.temp1 = new Record();
+		this.temp2 = new Record();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public int hash(Record object) {
+		int i = 0;
+		try {
+			int code = 0;
+			for (; i < this.keyFields.length; i++) {
+				code ^= object.getField(this.keyFields[i], this.transientKeyHolders[i]).hashCode();
+				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
+			}
+			return code;
+		}
+		catch (NullPointerException npex) {
+			throw new NullKeyFieldException(this.keyFields[i]);
+		}
+		catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(this.keyFields[i]);
+		}
+	}
+
+
+	@Override
+	public void setReference(Record toCompare) {
+		for (int i = 0; i < this.keyFields.length; i++) {
+			if (!toCompare.getFieldInto(this.keyFields[i], this.keyHolders[i])) {
+				throw new NullKeyFieldException(this.keyFields[i]);
+			}
+		}
+	}
+
+
+	@Override
+	public boolean equalToReference(Record candidate) {
+		for (int i = 0; i < this.keyFields.length; i++) {
+			final Key<?> k = candidate.getField(this.keyFields[i], this.transientKeyHolders[i]);
+			if (k == null) {
+				throw new NullKeyFieldException(this.keyFields[i]);
+			} else if (!k.equals(this.keyHolders[i])) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+
+	@Override
+	public int compareToReference(TypeComparator<Record> referencedAccessors) {
+		final RecordComparator pra = (RecordComparator) referencedAccessors;
+		
+		for (int i = 0; i < this.keyFields.length; i++) {
+			@SuppressWarnings("unchecked")
+			final int comp = pra.keyHolders[i].compareTo(this.keyHolders[i]);
+			if (comp != 0) {
+				return this.ascending[i] ? comp : -comp;
+			}
+		}
+		return 0;
+	}
+	
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public int compare(Record first, Record second) {
+		int i = 0;
+		try {
+			for (; i < this.keyFields.length; i++) {
+				Key k1 = first.getField(this.keyFields[i], this.keyHolders[i]);
+				Key k2 = second.getField(this.keyFields[i], this.transientKeyHolders[i]);
+				int cmp = k1.compareTo(k2);
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+			return 0;
+		}
+		catch (NullPointerException e) {
+			throw new NullKeyFieldException(this.keyFields[i]);
+		}
+	}
+	
+	@Override
+	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
+		this.temp1.read(source1);
+		this.temp2.read(source2);
+		
+		for (int i = 0; i < this.keyFields.length; i++) {
+			@SuppressWarnings("rawtypes")
+			final Key k1 = this.temp1.getField(this.keyFields[i], this.keyHolders[i]);
+			@SuppressWarnings("rawtypes")
+			final Key k2 = this.temp2.getField(this.keyFields[i], this.transientKeyHolders[i]);
+			
+			if (k1 == null || k2 == null) {
+				throw new NullKeyFieldException(this.keyFields[i]);
+			}
+			
+			@SuppressWarnings("unchecked")
+			final int comp = k1.compareTo(k2);
+			if (comp != 0) {
+				return this.ascending[i] ? comp : -comp;
+			}
+		}
+		return 0;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return this.numLeadingNormalizableKeys > 0;
+	}
+
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return this.normalizableKeyPrefixLen;
+	}
+	
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return this.numLeadingNormalizableKeys < this.keyFields.length ||
+				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+				this.normalizableKeyPrefixLen > keyBytes;
+	}
+
+	@Override
+	public void putNormalizedKey(Record record, MemorySegment target, int offset, int numBytes) {
+		int i = 0;
+		try {
+			for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
+			{
+				int len = this.normalizedKeyLengths[i]; 
+				len = numBytes >= len ? len : numBytes;
+				((NormalizableKey<?>) record.getField(this.keyFields[i], this.transientKeyHolders[i])).copyNormalizedKey(target, offset, len);
+				numBytes -= len;
+				offset += len;
+			}
+		}
+		catch (NullPointerException npex) {
+			throw new NullKeyFieldException(this.keyFields[i]);
+		}
+	}
+	
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !this.ascending[0];
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(Record record, DataOutputView target) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Record readWithKeyDenormalization(Record reuse, DataInputView source) {
+		throw new UnsupportedOperationException();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public RecordComparator duplicate() {
+		return new RecordComparator(this);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                           Non Standard Comparator Methods
+	// --------------------------------------------------------------------------------------------
+	
+	public final int[] getKeyPositions() {
+		return this.keyFields;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public final Class<? extends Key<?>>[] getKeyTypes() {
+		final Class<? extends Key<?>>[] keyTypes = new Class[this.keyHolders.length];
+		for (int i = 0; i < keyTypes.length; i++) {
+			keyTypes[i] = (Class<? extends Key<?>>) this.keyHolders[i].getClass();
+		}
+		return keyTypes;
+	}
+	
+	public final Key<?>[] getKeysAsCopy(Record record) {
+		try {
+			final Key<?>[] keys = new Key[this.keyFields.length];
+			for (int i = 0; i < keys.length; i++) {
+				keys[i] = this.keyHolders[i].getClass().newInstance();
+			}
+			if(!record.getFieldsInto(this.keyFields, keys)) {
+				throw new RuntimeException("Could not extract keys from record.");
+			}
+			return keys;
+		} catch (Exception ex) {
+			// this should never happen, because the classes have been instantiated before. Report for debugging.
+			throw new RuntimeException("Could not instantiate key classes when duplicating RecordComparator.", ex);
+		}
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		throw new UnsupportedOperationException("Record does not support extactKeys and " +
+				"getComparators. This cannot be used with the GenericPairComparator.");
+	}
+
+
+	@Override
+	public TypeComparator<?>[] getFlatComparators() {
+		throw new UnsupportedOperationException("Record does not support extactKeys and " +
+				"getComparators. This cannot be used with the GenericPairComparator.");
+	}
+
+	@Override
+	public boolean supportsCompareAgainstReference() {
+		return true;
+	}
+
+	@Override
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public final int compareAgainstReference(Comparable[] keys) {
+		for (int i = 0; i < this.keyFields.length; i++)
+		{
+			final int comp = keys[i].compareTo(this.keyHolders[i]);
+			if (comp != 0) {
+				return this.ascending[i] ? comp : -comp;
+			}
+		}
+		return 0;
+	}
+}


Mime
View raw message