flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [6/7] flink git commit: [FLINK-3169] Move Record Type Utils from flink-java to flink-runtime/test
Date Mon, 14 Dec 2015 18:48:35 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java
new file mode 100644
index 0000000..3ea7106
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.testutils.recordutils;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+
+/**
+ * A factory for a {@link org.apache.flink.api.common.typeutils.TypeComparator} for {@link
Record}. The comparator uses a subset of
+ * the fields for the comparison. That subset of fields (positions and types) is read from
the
+ * supplied configuration.
+ */
+public class RecordComparatorFactory implements TypeComparatorFactory<Record> {
+	
+	private static final String NUM_KEYS = "numkeys";
+	
+	private static final String KEY_POS_PREFIX = "keypos.";
+	
+	private static final String KEY_CLASS_PREFIX = "keyclass.";
+	
+	private static final String KEY_SORT_DIRECTION_PREFIX = "key-direction.";
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private int[] positions;
+	
+	private Class<? extends Key<?>>[] types;
+	
+	private boolean[] sortDirections;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public RecordComparatorFactory() {
+		// do nothing, allow to be configured via config
+	}
+	
+	public RecordComparatorFactory(int[] positions, Class<? extends Key<?>>[] types)
{
+		this(positions, types, null);
+	}
+	
+	public RecordComparatorFactory(int[] positions, Class<? extends Key<?>>[] types,
boolean[] sortDirections) {
+		if (positions == null || types == null) {
+			throw new NullPointerException();
+		}
+		if (positions.length != types.length) {
+			throw new IllegalArgumentException();
+		}
+		
+		this.positions = positions;
+		this.types = types;
+		
+		if (sortDirections == null) {
+			this.sortDirections = new boolean[positions.length];
+			Arrays.fill(this.sortDirections, true);
+		} else if (sortDirections.length != positions.length) {
+			throw new IllegalArgumentException();
+		} else {
+			this.sortDirections = sortDirections;
+		}
+	}
+
+
+	@Override
+	public void writeParametersToConfig(Configuration config) {
+		for (int i = 0; i < this.positions.length; i++) {
+			if (this.positions[i] < 0) {
+				throw new IllegalArgumentException("The key position " + i + " is invalid: " + this.positions[i]);
+			}
+			if (this.types[i] == null || !Key.class.isAssignableFrom(this.types[i])) {
+				throw new IllegalArgumentException("The key type " + i + " is null or not implenting
the interface " + 
+					Key.class.getName() + ".");
+			}
+		}
+		
+		// write the config
+		config.setInteger(NUM_KEYS, this.positions.length);
+		for (int i = 0; i < this.positions.length; i++) {
+			config.setInteger(KEY_POS_PREFIX + i, this.positions[i]);
+			config.setString(KEY_CLASS_PREFIX + i, this.types[i].getName());
+			config.setBoolean(KEY_SORT_DIRECTION_PREFIX + i, this.sortDirections[i]);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException
{
+		// figure out how many key fields there are
+		final int numKeyFields = config.getInteger(NUM_KEYS, -1);
+		if (numKeyFields < 0) {
+			throw new IllegalConfigurationException("The number of keys for the comparator is invalid:
" + numKeyFields);
+		}
+		
+		final int[] positions = new int[numKeyFields];
+		final Class<? extends Key<?>>[] types = new Class[numKeyFields];
+		final boolean[] direction = new boolean[numKeyFields];
+		
+		// read the individual key positions and types
+		for (int i = 0; i < numKeyFields; i++) {
+			// next key position
+			final int p = config.getInteger(KEY_POS_PREFIX + i, -1);
+			if (p >= 0) {
+				positions[i] = p;
+			} else {
+				throw new IllegalConfigurationException("Contained invalid position for key no positions
for keys.");
+			}
+			
+			// next key type
+			final String name = config.getString(KEY_CLASS_PREFIX + i, null);
+			if (name != null) {
+				types[i] = (Class<? extends Key<?>>) Class.forName(name, true, cl).asSubclass(Key.class);
+			} else {
+				throw new IllegalConfigurationException("The key type (" + i +
+					") for the comparator is null"); 
+			}
+			
+			// next key sort direction
+			direction[i] = config.getBoolean(KEY_SORT_DIRECTION_PREFIX + i, true);
+		}
+		
+		this.positions = positions;
+		this.types = types;
+		this.sortDirections = direction;
+	}
+	
+
+	@Override
+	public RecordComparator createComparator() {
+		return new RecordComparator(this.positions, this.types, this.sortDirections);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordPairComparator.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordPairComparator.java
new file mode 100644
index 0000000..fd51ce1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordPairComparator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.testutils.recordutils;
+
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.InstantiationUtil;
+
+
+/**
+ * Implementation of the {@link TypePairComparator} interface for Pact Records. The equality
is established on a set of
+ * key fields. The indices of the key fields may be different on the reference and candidate
side.
+ */
+public class RecordPairComparator extends TypePairComparator<Record, Record>  {
+	
+	private final int[] keyFields1, keyFields2;			// arrays with the positions of the keys in
the records
+	
+	@SuppressWarnings("rawtypes")
+	private final Key[] keyHolders1, keyHolders2;		// arrays with mutable objects for the key
types
+	
+	
+	public RecordPairComparator(int[] keyFieldsReference, int[] keyFieldsCandidate, Class<?
extends Key<?>>[] keyTypes) {
+		if (keyFieldsReference.length != keyFieldsCandidate.length || keyFieldsCandidate.length
!= keyTypes.length) {
+			throw new IllegalArgumentException(
+				"The arrays describing the key positions and types must be of the same length.");
+		}
+		this.keyFields1 = keyFieldsReference;
+		this.keyFields2 = keyFieldsCandidate;
+		
+		// instantiate fields to extract keys into
+		this.keyHolders1 = new Key[keyTypes.length];
+		this.keyHolders2 = new Key[keyTypes.length];
+		
+		for (int i = 0; i < keyTypes.length; i++) {
+			if (keyTypes[i] == null) {
+				throw new NullPointerException("Key type " + i + " is null.");
+			}
+			this.keyHolders1[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
+			this.keyHolders2[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+
+	@Override
+	public void setReference(Record reference) {
+		for (int i = 0; i < this.keyFields1.length; i++) {
+			if (!reference.getFieldInto(this.keyFields1[i], this.keyHolders1[i])) {
+				throw new NullKeyFieldException(this.keyFields1[i]);
+			}
+		}
+	}
+
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public boolean equalToReference(Record candidate) {
+		for (int i = 0; i < this.keyFields2.length; i++) {
+			final Key k = candidate.getField(this.keyFields2[i], this.keyHolders2[i]);
+			if (k == null) {
+				throw new NullKeyFieldException(this.keyFields2[i]);
+			} else if (!k.equals(this.keyHolders1[i])) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public int compareToReference(Record candidate) {
+		for (int i = 0; i < this.keyFields2.length; i++) {
+			final Key k = candidate.getField(this.keyFields2[i], this.keyHolders2[i]);
+			if (k == null) {
+				throw new NullKeyFieldException(this.keyFields2[i]);
+			} else {
+				final int comp = k.compareTo(this.keyHolders1[i]);
+				if (comp != 0) {
+					return comp;
+				}
+			}
+		}
+		return 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordPairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordPairComparatorFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordPairComparatorFactory.java
new file mode 100644
index 0000000..c1741b0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordPairComparatorFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.testutils.recordutils;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+
+/**
+ * A factory for a {@link TypePairComparator} for {@link Record}. The comparator uses a subset
of
+ * the fields for the comparison. That subset of fields (positions and types) is read from
the
+ * supplied configuration.
+ */
+public class RecordPairComparatorFactory implements TypePairComparatorFactory<Record,
Record> {
+	
+	private static final RecordPairComparatorFactory INSTANCE = new RecordPairComparatorFactory();
+	
+	/**
+	 * Gets an instance of the comparator factory. The instance is shared, since the factory
is a
+	 * stateless class. 
+	 * 
+	 * @return An instance of the comparator factory.
+	 */
+	public static final RecordPairComparatorFactory get() {
+		return INSTANCE;
+	}
+
+	@Override
+	public TypePairComparator<Record, Record> createComparator12(
+			TypeComparator<Record> comparator1,	TypeComparator<Record> comparator2)
+	{
+		if (!(comparator1 instanceof RecordComparator && comparator2 instanceof RecordComparator))
{
+			throw new IllegalArgumentException("Cannot instantiate pair comparator from the given
comparators.");
+		}
+		final RecordComparator prc1 = (RecordComparator) comparator1;
+		final RecordComparator prc2 = (RecordComparator) comparator2;
+		
+		final int[] pos1 = prc1.getKeyPositions();
+		final int[] pos2 = prc2.getKeyPositions();
+		
+		final Class<? extends Key<?>>[] types1 = prc1.getKeyTypes();
+		final Class<? extends Key<?>>[] types2 = prc2.getKeyTypes();
+		
+		checkComparators(pos1, pos2, types1, types2);
+		
+		return new RecordPairComparator(pos1, pos2, types1);
+	}
+
+	@Override
+	public TypePairComparator<Record, Record> createComparator21(
+		TypeComparator<Record> comparator1,	TypeComparator<Record> comparator2)
+	{
+		if (!(comparator1 instanceof RecordComparator && comparator2 instanceof RecordComparator))
{
+			throw new IllegalArgumentException("Cannot instantiate pair comparator from the given
comparators.");
+		}
+		final RecordComparator prc1 = (RecordComparator) comparator1;
+		final RecordComparator prc2 = (RecordComparator) comparator2;
+		
+		final int[] pos1 = prc1.getKeyPositions();
+		final int[] pos2 = prc2.getKeyPositions();
+		
+		final Class<? extends Key<?>>[] types1 = prc1.getKeyTypes();
+		final Class<? extends Key<?>>[] types2 = prc2.getKeyTypes();
+		
+		checkComparators(pos1, pos2, types1, types2);
+		
+		return new RecordPairComparator(pos2, pos1, types1);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	private static final void checkComparators(int[] pos1, int[] pos2, 
+							Class<? extends Key<?>>[] types1, Class<? extends Key<?>>[]
types2)
+	{
+		if (pos1.length != pos2.length || types1.length != types2.length) {
+			throw new IllegalArgumentException(
+				"The given pair of RecordComparators does not operate on the same number of fields.");
+		}
+		for (int i = 0; i < types1.length; i++) {
+			if (!types1[i].equals(types2[i])) {
+				throw new IllegalArgumentException(
+				"The given pair of RecordComparators does not operates on different data types.");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
new file mode 100644
index 0000000..146ccd0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.testutils.recordutils;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Record;
+
+
+/**
+ * Implementation of the (de)serialization and copying logic for the {@link Record}.
+ */
+public final class RecordSerializer extends TypeSerializer<Record> {
+	
+	private static final long serialVersionUID = 1L;
+
+	private static final RecordSerializer INSTANCE = new RecordSerializer(); // singleton instance
+	
+	private static final int MAX_BIT = 0x80;	// byte where only the most significant bit is
set
+	
+	// --------------------------------------------------------------------------------------------
+
+	public static RecordSerializer get() {
+		return INSTANCE;
+	}
+	
+	/**
+	 * Creates a new instance of the RecordSerializers. Private to prevent instantiation.
+	 */
+	private RecordSerializer() {}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public RecordSerializer duplicate() {
+		// does not hold state, so just return ourselves
+		return this;
+	}
+	
+	@Override
+	public Record createInstance() {
+		return new Record(); 
+	}
+
+	@Override
+	public Record copy(Record from) {
+		return from.createCopy();
+	}
+	
+	@Override
+	public Record copy(Record from, Record reuse) {
+		from.copyTo(reuse);
+		return reuse;
+	}
+	
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void serialize(Record record, DataOutputView target) throws IOException {
+		record.serialize(target);
+	}
+
+	@Override
+	public Record deserialize(DataInputView source) throws IOException {
+		return deserialize(new Record(), source);
+	}
+	
+	@Override
+	public Record deserialize(Record target, DataInputView source) throws IOException {
+		target.deserialize(source);
+		return target;
+	}
+	
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		int val = source.readUnsignedByte();
+		target.writeByte(val);
+		
+		if (val >= MAX_BIT) {
+			int shift = 7;
+			int curr;
+			val = val & 0x7f;
+			while ((curr = source.readUnsignedByte()) >= MAX_BIT) {
+				target.writeByte(curr);
+				val |= (curr & 0x7f) << shift;
+				shift += 7;
+			}
+			target.writeByte(curr);
+			val |= curr << shift;
+		}
+		
+		target.write(source, val);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof RecordSerializer) {
+			RecordSerializer other = (RecordSerializer) obj;
+			return other.canEqual(this);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof RecordSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return RecordSerializer.class.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializerFactory.java
new file mode 100644
index 0000000..ea14591
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializerFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.testutils.recordutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Record;
+
+/**
+ * A factory that create a serializer for the {@link Record} data type.
+ */
+public class RecordSerializerFactory implements TypeSerializerFactory<Record> {
+	
+	private static final RecordSerializerFactory INSTANCE = new RecordSerializerFactory();
+	
+	/**
+	 * Gets an instance of the serializer factory. The instance is shared, since the factory
is a
+	 * stateless class. 
+	 * 
+	 * @return An instance of the serializer factory.
+	 */
+	public static final RecordSerializerFactory get() {
+		return INSTANCE;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public void writeParametersToConfig(Configuration config) {}
+
+	@Override
+	public void readParametersFromConfig(Configuration config, ClassLoader cl) {}
+	
+
+	@Override
+	public TypeSerializer<Record> getSerializer() {
+		return RecordSerializer.get();
+	}
+
+	@Override
+	public Class<Record> getDataType() {
+		return Record.class;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return 31;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == RecordSerializerFactory.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
index b71fdb0..3cdf775 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/066913e2/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java
index 8a9f8ba..fae3767 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializer;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;


Mime
View raw message