flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/5] [FLINK-1111] Move Basic and Array Type Information into "flink-core" Project
Date Mon, 22 Sep 2014 15:40:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
index 6e92d44..465ef83 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
@@ -21,9 +21,9 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 public class PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 4de7311..7a59570 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.types.TypeInformation;
 
 public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
index 894a4a2..eeeb6c4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
@@ -21,9 +21,9 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
index c2973b7..18d263f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
@@ -21,9 +21,9 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 public class PlanRightUnwrappingJoinOperator<I1, I2, OUT, K>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index c222ff2..22cbce7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -22,10 +22,10 @@ import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 4da981c..54558ae 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -21,9 +21,9 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.TypeInformation;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
index 588c910..e4b815b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java
@@ -27,9 +27,9 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.FileDataSinkBase;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.java.typeutils.RecordTypeInfo;
 import org.apache.flink.types.Nothing;
-import org.apache.flink.types.NothingTypeInfo;
 import org.apache.flink.types.Record;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
index f47b53e..3ea8157 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java
@@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.java.typeutils.RecordTypeInfo;
 import org.apache.flink.types.Nothing;
-import org.apache.flink.types.NothingTypeInfo;
 import org.apache.flink.types.Record;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AtomicType.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AtomicType.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AtomicType.java
deleted file mode 100644
index 9ccda5e..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AtomicType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-
-
-/**
- *
- */
-public interface AtomicType<T> {
-	
-	TypeComparator<T> createComparator(boolean sortOrderAscending);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicArrayTypeInfo.java
deleted file mode 100644
index 2518590..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicArrayTypeInfo.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
-import org.apache.flink.api.java.functions.InvalidTypesException;
-import org.apache.flink.api.java.typeutils.runtime.GenericArraySerializer;
-import org.apache.flink.types.TypeInformation;
-
-public class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
-
-	public static final BasicArrayTypeInfo<String[], String> STRING_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<String[], String>(String[].class, BasicTypeInfo.STRING_TYPE_INFO);
-	
-	public static final BasicArrayTypeInfo<Boolean[], Boolean> BOOLEAN_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Boolean[], Boolean>(Boolean[].class, BasicTypeInfo.BOOLEAN_TYPE_INFO);
-	public static final BasicArrayTypeInfo<Byte[], Byte> BYTE_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Byte[], Byte>(Byte[].class, BasicTypeInfo.BYTE_TYPE_INFO);
-	public static final BasicArrayTypeInfo<Short[], Short> SHORT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Short[], Short>(Short[].class, BasicTypeInfo.SHORT_TYPE_INFO);
-	public static final BasicArrayTypeInfo<Integer[], Integer> INT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Integer[], Integer>(Integer[].class, BasicTypeInfo.INT_TYPE_INFO);
-	public static final BasicArrayTypeInfo<Long[], Long> LONG_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Long[], Long>(Long[].class, BasicTypeInfo.LONG_TYPE_INFO);
-	public static final BasicArrayTypeInfo<Float[], Float> FLOAT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Float[], Float>(Float[].class, BasicTypeInfo.FLOAT_TYPE_INFO);
-	public static final BasicArrayTypeInfo<Double[], Double> DOUBLE_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Double[], Double>(Double[].class, BasicTypeInfo.DOUBLE_TYPE_INFO);
-	public static final BasicArrayTypeInfo<Character[], Character> CHAR_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Character[], Character>(Character[].class, BasicTypeInfo.CHAR_TYPE_INFO);
-	
-	// --------------------------------------------------------------------------------------------
-
-	private final Class<T> arrayClass;
-	private final Class<C> componentClass;
-	private final TypeInformation<C> componentInfo;
-
-	@SuppressWarnings("unchecked")
-	private BasicArrayTypeInfo(Class<T> arrayClass, BasicTypeInfo<C> componentInfo) {
-		this.arrayClass = arrayClass;
-		this.componentClass = (Class<C>) arrayClass.getComponentType();
-		this.componentInfo = componentInfo;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return this.arrayClass;
-	}
-
-	public Class<C> getComponentTypeClass() {
-		return this.componentClass;
-	}
-	
-	public TypeInformation<C> getComponentInfo() {
-		return componentInfo;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return false;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public TypeSerializer<T> createSerializer() {
-		// special case the string array
-		if (componentClass.equals(String.class)) {
-			return (TypeSerializer<T>) StringArraySerializer.INSTANCE;
-		} else {
-			return (TypeSerializer<T>) new GenericArraySerializer<C>(this.componentClass, this.componentInfo.createSerializer());
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return this.getClass().getSimpleName()+"<"+this.componentInfo+">";
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	public static <X, C> BasicArrayTypeInfo<X, C> getInfoFor(Class<X> type) {
-		if (!type.isArray()) {
-			throw new InvalidTypesException("The given class is no array.");
-		}
-
-		// basic type arrays
-		return (BasicArrayTypeInfo<X, C>) TYPES.get(type);
-	}
-
-	private static final Map<Class<?>, BasicArrayTypeInfo<?, ?>> TYPES = new HashMap<Class<?>, BasicArrayTypeInfo<?, ?>>();
-
-	static {
-		TYPES.put(String[].class, STRING_ARRAY_TYPE_INFO);
-		TYPES.put(Boolean[].class, BOOLEAN_ARRAY_TYPE_INFO);
-		TYPES.put(Byte[].class, BYTE_ARRAY_TYPE_INFO);
-		TYPES.put(Short[].class, SHORT_ARRAY_TYPE_INFO);
-		TYPES.put(Integer[].class, INT_ARRAY_TYPE_INFO);
-		TYPES.put(Long[].class, LONG_ARRAY_TYPE_INFO);
-		TYPES.put(Float[].class, FLOAT_ARRAY_TYPE_INFO);
-		TYPES.put(Double[].class, DOUBLE_ARRAY_TYPE_INFO);
-		TYPES.put(Character[].class, CHAR_ARRAY_TYPE_INFO);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicTypeInfo.java
deleted file mode 100644
index d54c18f..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicTypeInfo.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.BooleanComparator;
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
-import org.apache.flink.api.common.typeutils.base.ByteComparator;
-import org.apache.flink.api.common.typeutils.base.ByteSerializer;
-import org.apache.flink.api.common.typeutils.base.CharComparator;
-import org.apache.flink.api.common.typeutils.base.CharSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.FloatComparator;
-import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.ShortComparator;
-import org.apache.flink.api.common.typeutils.base.ShortSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.types.TypeInformation;
-
-
-/**
- *
- */
-public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
-
-	public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<String>(String.class, StringSerializer.INSTANCE, StringComparator.class);
-	public static final BasicTypeInfo<Boolean> BOOLEAN_TYPE_INFO = new BasicTypeInfo<Boolean>(Boolean.class, BooleanSerializer.INSTANCE, BooleanComparator.class);
-	public static final BasicTypeInfo<Byte> BYTE_TYPE_INFO = new BasicTypeInfo<Byte>(Byte.class, ByteSerializer.INSTANCE, ByteComparator.class);
-	public static final BasicTypeInfo<Short> SHORT_TYPE_INFO = new BasicTypeInfo<Short>(Short.class, ShortSerializer.INSTANCE, ShortComparator.class);
-	public static final BasicTypeInfo<Integer> INT_TYPE_INFO = new BasicTypeInfo<Integer>(Integer.class, IntSerializer.INSTANCE, IntComparator.class);
-	public static final BasicTypeInfo<Long> LONG_TYPE_INFO = new BasicTypeInfo<Long>(Long.class, LongSerializer.INSTANCE, LongComparator.class);
-	public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO = new BasicTypeInfo<Float>(Float.class, FloatSerializer.INSTANCE, FloatComparator.class);
-	public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new BasicTypeInfo<Double>(Double.class, DoubleSerializer.INSTANCE, DoubleComparator.class);
-	public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, CharSerializer.INSTANCE, CharComparator.class);
-	
-	// --------------------------------------------------------------------------------------------
-
-	private final Class<T> clazz;
-	
-	private final TypeSerializer<T> serializer;
-	
-	private final Class<? extends TypeComparator<T>> comparatorClass;
-	
-	
-	private BasicTypeInfo(Class<T> clazz, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
-		this.clazz = clazz;
-		this.serializer = serializer;
-		this.comparatorClass = comparatorClass;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isBasicType() {
-		return true;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return this.clazz;
-	}
-	
-	@Override
-	public boolean isKeyType() {
-		return true;
-	}
-	
-	@Override
-	public TypeSerializer<T> createSerializer() {
-		return this.serializer;
-	}
-	
-	@Override
-	public TypeComparator<T> createComparator(boolean sortOrderAscending) {
-		return instantiateComparator(comparatorClass, sortOrderAscending);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return this.clazz.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof BasicTypeInfo) {
-			@SuppressWarnings("unchecked")
-			BasicTypeInfo<T> other = (BasicTypeInfo<T>) obj;
-			return this.clazz.equals(other.clazz);
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return clazz.getSimpleName();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static <X> BasicTypeInfo<X> getInfoFor(Class<X> type) {
-		if (type == null) {
-			throw new NullPointerException();
-		}
-		
-		@SuppressWarnings("unchecked")
-		BasicTypeInfo<X> info = (BasicTypeInfo<X>) TYPES.get(type);
-		return info;
-	}
-	
-	private static <X> TypeComparator<X> instantiateComparator(Class<? extends TypeComparator<X>> comparatorClass, boolean ascendingOrder) {
-		try {
-			Constructor<? extends TypeComparator<X>> constructor = comparatorClass.getConstructor(boolean.class);
-			return constructor.newInstance(ascendingOrder);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not initialize basic comparator " + comparatorClass.getName(), e);
-		}
-	}
-	
-	private static final Map<Class<?>, BasicTypeInfo<?>> TYPES = new HashMap<Class<?>, BasicTypeInfo<?>>();
-	
-	static {
-		TYPES.put(String.class, STRING_TYPE_INFO);
-		TYPES.put(Boolean.class, BOOLEAN_TYPE_INFO);
-		TYPES.put(boolean.class, BOOLEAN_TYPE_INFO);
-		TYPES.put(Byte.class, BYTE_TYPE_INFO);
-		TYPES.put(byte.class, BYTE_TYPE_INFO);
-		TYPES.put(Short.class, SHORT_TYPE_INFO);
-		TYPES.put(short.class, SHORT_TYPE_INFO);
-		TYPES.put(Integer.class, INT_TYPE_INFO);
-		TYPES.put(int.class, INT_TYPE_INFO);
-		TYPES.put(Long.class, LONG_TYPE_INFO);
-		TYPES.put(long.class, LONG_TYPE_INFO);
-		TYPES.put(Float.class, FLOAT_TYPE_INFO);
-		TYPES.put(float.class, FLOAT_TYPE_INFO);
-		TYPES.put(Double.class, DOUBLE_TYPE_INFO);
-		TYPES.put(double.class, DOUBLE_TYPE_INFO);
-		TYPES.put(Character.class, CHAR_TYPE_INFO);
-		TYPES.put(char.class, CHAR_TYPE_INFO);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/CompositeType.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/CompositeType.java
deleted file mode 100644
index 579eb39..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/CompositeType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-
-
-/**
- *
- */
-public interface CompositeType<T> {
-
-	TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index cbc36ed..6cbefed 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.apache.flink.types.TypeInformation;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
index b14f16d..d3a70a6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils;
 
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this interface to be configured

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 0802280..e770898 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -22,11 +22,12 @@ import java.lang.reflect.GenericArrayType;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.GenericArraySerializer;
-import org.apache.flink.types.TypeInformation;
 
 public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index f30be89..105a275 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils;
 
 import java.lang.reflect.Field;
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 class PojoField {
 	public Field field;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 9fddede..efdf152 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -26,11 +26,13 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.CompositeType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.types.TypeInformation;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PrimitiveArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PrimitiveArrayTypeInfo.java
deleted file mode 100644
index fc4dea8..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PrimitiveArrayTypeInfo.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
-import org.apache.flink.api.java.functions.InvalidTypesException;
-import org.apache.flink.types.TypeInformation;
-
-public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
-
-	public static final PrimitiveArrayTypeInfo<boolean[]> BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<boolean[]>(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<byte[]> BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<byte[]>(byte[].class, BytePrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<short[]> SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<short[]>(short[].class, ShortPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<int[]> INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<int[]>(int[].class, IntPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<long[]> LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<long[]>(long[].class, LongPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<float[]> FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<float[]>(float[].class, FloatPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<double[]> DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<double[]>(double[].class, DoublePrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<char[]> CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<char[]>(char[].class, CharPrimitiveArraySerializer.INSTANCE);
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private final Class<T> arrayClass;
-	private final TypeSerializer<T> serializer;
-
-	private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer) {
-		this.arrayClass = arrayClass;
-		this.serializer = serializer;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return this.arrayClass;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return false;
-	}
-
-	@Override
-	public TypeSerializer<T> createSerializer() {
-		return this.serializer;
-	}
-	
-	@Override
-	public String toString() {
-		return arrayClass.getComponentType().getName() + "[]";
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	public static <X> PrimitiveArrayTypeInfo<X> getInfoFor(Class<X> type) {
-		if (!type.isArray()) {
-			throw new InvalidTypesException("The given class is no array.");
-		}
-
-		// basic type arrays
-		return (PrimitiveArrayTypeInfo<X>) TYPES.get(type);
-	}
-
-	private static final Map<Class<?>, PrimitiveArrayTypeInfo<?>> TYPES = new HashMap<Class<?>, PrimitiveArrayTypeInfo<?>>();
-
-	static {
-		TYPES.put(boolean[].class, BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO);
-		TYPES.put(byte[].class, BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
-		TYPES.put(short[].class, SHORT_PRIMITIVE_ARRAY_TYPE_INFO);
-		TYPES.put(int[].class, INT_PRIMITIVE_ARRAY_TYPE_INFO);
-		TYPES.put(long[].class, LONG_PRIMITIVE_ARRAY_TYPE_INFO);
-		TYPES.put(float[].class, FLOAT_PRIMITIVE_ARRAY_TYPE_INFO);
-		TYPES.put(double[].class, DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO);
-		TYPES.put(char[].class, CHAR_PRIMITIVE_ARRAY_TYPE_INFO);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
index 980b0f6..445fef9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.types.Record;
-import org.apache.flink.types.TypeInformation;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
index 415a026..f20060e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  * This interface can be implemented by functions and input formats to tell the framework

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 25be7f1..1f0d1cd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -20,11 +20,13 @@ package org.apache.flink.api.java.typeutils;
 
 import java.util.Arrays;
 
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.types.TypeInformation;
 
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.java.tuple.*;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index bca29dc..7701a1d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -20,7 +20,8 @@ package org.apache.flink.api.java.typeutils;
 
 import java.util.Arrays;
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.CompositeType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 public abstract class TupleTypeInfoBase<T> extends TypeInformation<T> implements CompositeType<T> {
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a8a833f..3bc838c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -36,16 +36,19 @@ import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.types.Value;
 import org.apache.hadoop.io.Writable;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
index c73c79f..469b4ec 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -22,7 +22,10 @@ package org.apache.flink.api.java.typeutils;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Value;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 0375af6..cf43988 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.InvalidTypesException;
 import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
 import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
 import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
 import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
 import org.apache.flink.types.CopyableValue;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.types.Value;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index a2e241f..d21c371 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.InvalidTypesException;
 import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
 import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
-import org.apache.flink.types.TypeInformation;
 import org.apache.hadoop.io.Writable;
 
 public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
index 6911580..7969740 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
@@ -43,9 +43,9 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>>
 	
 	private transient T tempReference;
 
-	private final Comparable[] extractedKey = new Comparable[1];
+	private final Comparable<?>[] extractedKey = new Comparable[1];
 
-	private final TypeComparator[] comparators = new TypeComparator[] {this};
+	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
 
 	public CopyableValueComparator(boolean ascending, Class<T> type) {
 		this.type = type;
@@ -132,7 +132,7 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>>
 	}
 
 	@Override
-	public TypeComparator[] getComparators() {
+	public TypeComparator<?>[] getComparators() {
 		return comparators;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializer.java
deleted file mode 100644
index e67334d..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializer.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.lang.reflect.Array;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-
-/**
- * @param <C> The component type
- */
-public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final Class<C> componentClass;
-	
-	private final TypeSerializer<C> componentSerializer;
-	
-	private final C[] EMPTY;
-	
-	
-	
-	public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
-		if (componentClass == null || componentSerializer == null) {
-			throw new NullPointerException();
-		}
-		
-		this.componentClass = componentClass;
-		this.componentSerializer = componentSerializer;
-		this.EMPTY = create(0);
-	}
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public boolean isStateful() {
-		return this.componentSerializer.isStateful();
-	}
-
-	
-	@Override
-	public C[] createInstance() {
-		return EMPTY;
-	}
-
-	@Override
-	public C[] copy(C[] from, C[] reuse) {
-		C[] copy = create(from.length);
-
-		for (int i = 0; i < copy.length; i++) {
-			copy[i] = this.componentSerializer.copy(from[i], this.componentSerializer.createInstance());
-		}
-
-		return copy;
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(C[] value, DataOutputView target) throws IOException {
-		target.writeInt(value.length);
-		for (int i = 0; i < value.length; i++) {
-			C val = value[i];
-			if (val == null) {
-				target.writeBoolean(false);
-			} else {
-				target.writeBoolean(true);
-				componentSerializer.serialize(val, target);
-			}
-		}
-	}
-
-	@Override
-	public C[] deserialize(C[] reuse, DataInputView source) throws IOException {
-		int len = source.readInt();
-		
-		if (reuse.length != len) {
-			reuse = create(len);
-		}
-		
-		for (int i = 0; i < len; i++) {
-			boolean isNonNull = source.readBoolean();
-			if (isNonNull) {
-				reuse[i] = componentSerializer.deserialize(componentSerializer.createInstance(), source);
-			} else {
-				reuse[i] = null;
-			}
-		}
-		
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		int len = source.readInt();
-		target.writeInt(len);
-		
-		for (int i = 0; i < len; i++) {
-			boolean isNonNull = source.readBoolean();
-			target.writeBoolean(isNonNull);
-			
-			if (isNonNull) {
-				componentSerializer.copy(source, target);
-			}
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	private final C[] create(int len) {
-		return (C[]) Array.newInstance(componentClass, len);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return componentClass.hashCode() + componentSerializer.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof GenericArraySerializer) {
-			GenericArraySerializer<?> other = (GenericArraySerializer<?>) obj;
-			return this.componentClass == other.componentClass &&
-					this.componentSerializer.equals(other.componentSerializer);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
deleted file mode 100644
index bf8d56c..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime.record;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-
-
-/**
- * Implementation of the {@link TypeComparator} interface for the pact record. Instances of this class
- * are parameterized with which fields are relevant to the comparison. 
- */
-public final class RecordComparator extends TypeComparator<Record> {
-	
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * A sequence of prime numbers to be used for salting the computed hash values.
-	 * Based on some empirical evidence, we are using a 32-element subsequence of the  
-	 * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
-	 * 
-	 * @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
-	 * @see: http://oeis.org/A068652
-	 */
-	private static final int[] HASH_SALT = new int[] { 
-		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
-		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
-		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
-		19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
-	
-	private final int[] keyFields;
-	
-	@SuppressWarnings("rawtypes")
-	private final Key[] keyHolders, transientKeyHolders;
-	
-	private final Record temp1, temp2;
-	
-	private final boolean[] ascending;
-	
-	private final int[] normalizedKeyLengths;
-	
-	private final int numLeadingNormalizableKeys;
-	
-	private final int normalizableKeyPrefixLen;
-	
-
-	/**
-	 * Creates a new comparator that compares Pact Records by the subset of fields as described
-	 * by the given key positions and types. All order comparisons will assume ascending order on all fields.
-	 * 
-	 * @param keyFields The positions of the key fields.
-	 * @param keyTypes The types (classes) of the key fields.
-	 */
-	public RecordComparator(int[] keyFields, Class<? extends Key<?>>[] keyTypes) {
-		this(keyFields, keyTypes, null);
-	}
-	
-	/**
-	 * Creates a new comparator that compares Pact Records by the subset of fields as described
-	 * by the given key positions and types.
-	 * 
-	 * @param keyFields The positions of the key fields.
-	 * @param keyTypes The types (classes) of the key fields.
-	 * @param sortDirection The direction for sorting. A value of <i>true</i> indicates ascending for an attribute,
-	 *                  a value of <i>false</i> indicated descending. If the parameter is <i>null</i>, then
-	 *                  all order comparisons will assume ascending order on all fields.
-	 */
-	public RecordComparator(int[] keyFields, Class<? extends Key<?>>[] keyTypes, boolean[] sortDirection) {
-		this.keyFields = keyFields;
-		
-		// instantiate fields to extract keys into
-		this.keyHolders = new Key[keyTypes.length];
-		this.transientKeyHolders = new Key[keyTypes.length];
-		for (int i = 0; i < keyTypes.length; i++) {
-			if (keyTypes[i] == null) {
-				throw new NullPointerException("Key type " + i + " is null.");
-			}
-			this.keyHolders[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
-			this.transientKeyHolders[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
-		}
-		
-		// set up auxiliary fields for normalized key support
-		this.normalizedKeyLengths = new int[keyFields.length];
-		int nKeys = 0;
-		int nKeyLen = 0;
-		boolean inverted = false;
-		for (int i = 0; i < this.keyHolders.length; i++) {
-			Key<?> k = this.keyHolders[i];
-			if (k instanceof NormalizableKey) {
-				if (sortDirection != null) {
-					if (sortDirection[i] && inverted) {
-						break;
-					} else if (i == 0 && !sortDirection[0]) {
-						inverted = true;
-					}
-				}
-				nKeys++;
-				final int len = ((NormalizableKey<?>) k).getMaxNormalizedKeyLen();
-				if (len < 0) {
-					throw new RuntimeException("Data type " + k.getClass().getName() + 
-						" specifies an invalid length for the normalized key: " + len);
-				}
-				this.normalizedKeyLengths[i] = len;
-				nKeyLen += this.normalizedKeyLengths[i];
-				if (nKeyLen < 0) {
-					nKeyLen = Integer.MAX_VALUE;
-					break;
-				}
-			} else {
-				break;
-			}
-		}
-		this.numLeadingNormalizableKeys = nKeys;
-		this.normalizableKeyPrefixLen = nKeyLen;
-		
-		this.temp1 = new Record();
-		this.temp2 = new Record();
-		
-		if (sortDirection != null) {
-			this.ascending = sortDirection;
-		} else {
-			this.ascending = new boolean[keyFields.length];
-			for (int i = 0; i < this.ascending.length; i++) {
-				this.ascending[i] = true;
-			}
-		}
-	}
-	
-	/**
-	 * Copy constructor.
-	 * 
-	 * @param toCopy Comparator to copy.
-	 */
-	private RecordComparator(RecordComparator toCopy) {
-		this.keyFields = toCopy.keyFields;
-		this.keyHolders = new Key[toCopy.keyHolders.length];
-		this.transientKeyHolders = new Key[toCopy.keyHolders.length];
-		
-		try {
-			for (int i = 0; i < this.keyHolders.length; i++) {
-				this.keyHolders[i] = toCopy.keyHolders[i].getClass().newInstance();
-				this.transientKeyHolders[i] = toCopy.keyHolders[i].getClass().newInstance();
-			}
-		} catch (Exception ex) {
-			// this should never happen, because the classes have been instantiated before. Report for debugging.
-			throw new RuntimeException("Could not instantiate key classes when duplicating RecordComparator.", ex);
-		}
-		
-		this.normalizedKeyLengths = toCopy.normalizedKeyLengths;
-		this.numLeadingNormalizableKeys = toCopy.numLeadingNormalizableKeys;
-		this.normalizableKeyPrefixLen = toCopy.normalizableKeyPrefixLen;
-		this.ascending = toCopy.ascending;
-		
-		this.temp1 = new Record();
-		this.temp2 = new Record();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public int hash(Record object) {
-		int i = 0;
-		try {
-			int code = 0;
-			for (; i < this.keyFields.length; i++) {
-				code ^= object.getField(this.keyFields[i], this.transientKeyHolders[i]).hashCode();
-				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
-			}
-			return code;
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i]);
-		}
-		catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(this.keyFields[i]);
-		}
-	}
-
-
-	@Override
-	public void setReference(Record toCompare) {
-		for (int i = 0; i < this.keyFields.length; i++) {
-			if (!toCompare.getFieldInto(this.keyFields[i], this.keyHolders[i])) {
-				throw new NullKeyFieldException(this.keyFields[i]);
-			}
-		}
-	}
-
-
-	@Override
-	public boolean equalToReference(Record candidate) {
-		for (int i = 0; i < this.keyFields.length; i++) {
-			final Key<?> k = candidate.getField(this.keyFields[i], this.transientKeyHolders[i]);
-			if (k == null) {
-				throw new NullKeyFieldException(this.keyFields[i]);
-			} else if (!k.equals(this.keyHolders[i])) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-
-	@Override
-	public int compareToReference(TypeComparator<Record> referencedAccessors) {
-		final RecordComparator pra = (RecordComparator) referencedAccessors;
-		
-		for (int i = 0; i < this.keyFields.length; i++) {
-			@SuppressWarnings("unchecked")
-			final int comp = pra.keyHolders[i].compareTo(this.keyHolders[i]);
-			if (comp != 0) {
-				return this.ascending[i] ? comp : -comp;
-			}
-		}
-		return 0;
-	}
-	
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public int compare(Record first, Record second) {
-		int i = 0;
-		try {
-			for (; i < this.keyFields.length; i++) {
-				Key k1 = first.getField(this.keyFields[i], this.keyHolders[i]);
-				Key k2 = second.getField(this.keyFields[i], this.transientKeyHolders[i]);
-				int cmp = k1.compareTo(k2);
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-			return 0;
-		}
-		catch (NullPointerException e) {
-			throw new NullKeyFieldException(this.keyFields[i]);
-		}
-	}
-	
-	@Override
-	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
-		this.temp1.read(source1);
-		this.temp2.read(source2);
-		
-		for (int i = 0; i < this.keyFields.length; i++) {
-			@SuppressWarnings("rawtypes")
-			final Key k1 = this.temp1.getField(this.keyFields[i], this.keyHolders[i]);
-			@SuppressWarnings("rawtypes")
-			final Key k2 = this.temp2.getField(this.keyFields[i], this.transientKeyHolders[i]);
-			
-			if (k1 == null || k2 == null) {
-				throw new NullKeyFieldException(this.keyFields[i]);
-			}
-			
-			@SuppressWarnings("unchecked")
-			final int comp = k1.compareTo(k2);
-			if (comp != 0) {
-				return this.ascending[i] ? comp : -comp;
-			}
-		}
-		return 0;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return this.numLeadingNormalizableKeys > 0;
-	}
-
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return this.normalizableKeyPrefixLen;
-	}
-	
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return this.numLeadingNormalizableKeys < this.keyFields.length ||
-				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
-				this.normalizableKeyPrefixLen > keyBytes;
-	}
-
-	@Override
-	public void putNormalizedKey(Record record, MemorySegment target, int offset, int numBytes) {
-		int i = 0;
-		try {
-			for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
-			{
-				int len = this.normalizedKeyLengths[i]; 
-				len = numBytes >= len ? len : numBytes;
-				((NormalizableKey<?>) record.getField(this.keyFields[i], this.transientKeyHolders[i])).copyNormalizedKey(target, offset, len);
-				numBytes -= len;
-				offset += len;
-			}
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i]);
-		}
-	}
-	
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return !this.ascending[0];
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(Record record, DataOutputView target) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Record readWithKeyDenormalization(Record reuse, DataInputView source) {
-		throw new UnsupportedOperationException();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public RecordComparator duplicate() {
-		return new RecordComparator(this);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                           Non Standard Comparator Methods
-	// --------------------------------------------------------------------------------------------
-	
-	public final int[] getKeyPositions() {
-		return this.keyFields;
-	}
-	
-	@SuppressWarnings("unchecked")
-	public final Class<? extends Key<?>>[] getKeyTypes() {
-		final Class<? extends Key<?>>[] keyTypes = new Class[this.keyHolders.length];
-		for (int i = 0; i < keyTypes.length; i++) {
-			keyTypes[i] = (Class<? extends Key<?>>) this.keyHolders[i].getClass();
-		}
-		return keyTypes;
-	}
-	
-	public final Key<?>[] getKeysAsCopy(Record record) {
-		try {
-			final Key<?>[] keys = new Key[this.keyFields.length];
-			for (int i = 0; i < keys.length; i++) {
-				keys[i] = this.keyHolders[i].getClass().newInstance();
-			}
-			if(!record.getFieldsInto(this.keyFields, keys)) {
-				throw new RuntimeException("Could not extract keys from record.");
-			}
-			return keys;
-		} catch (Exception ex) {
-			// this should never happen, because the classes have been instantiated before. Report for debugging.
-			throw new RuntimeException("Could not instantiate key classes when duplicating RecordComparator.", ex);
-		}
-	}
-
-	@Override
-	public Object[] extractKeys(Record record) {
-		throw new UnsupportedOperationException("Record does not support extactKeys and " +
-				"getComparators. This cannot be used with the GenericPairComparator.");
-	}
-
-	@Override
-	public TypeComparator[] getComparators() {
-		throw new UnsupportedOperationException("Record does not support extactKeys and " +
-				"getComparators. This cannot be used with the GenericPairComparator.");
-	}
-
-	@Override
-	public boolean supportsCompareAgainstReference() {
-		return true;
-	}
-
-	@Override
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public final int compareAgainstReference(Comparable[] keys) {
-		for (int i = 0; i < this.keyFields.length; i++)
-		{
-			final int comp = keys[i].compareTo(this.keyHolders[i]);
-			if (comp != 0) {
-				return this.ascending[i] ? comp : -comp;
-			}
-		}
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparatorFactory.java
deleted file mode 100644
index 5a30969..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparatorFactory.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime.record;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-/**
- * A factory for a {@link org.apache.flink.api.common.typeutils.TypeComparator} for {@link Record}. The comparator uses a subset of
- * the fields for the comparison. That subset of fields (positions and types) is read from the
- * supplied configuration.
- */
-public class RecordComparatorFactory implements TypeComparatorFactory<Record> {
-	
-	private static final String NUM_KEYS = "numkeys";
-	
-	private static final String KEY_POS_PREFIX = "keypos.";
-	
-	private static final String KEY_CLASS_PREFIX = "keyclass.";
-	
-	private static final String KEY_SORT_DIRECTION_PREFIX = "key-direction.";
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private int[] positions;
-	
-	private Class<? extends Key<?>>[] types;
-	
-	private boolean[] sortDirections;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public RecordComparatorFactory() {
-		// do nothing, allow to be configured via config
-	}
-	
-	public RecordComparatorFactory(int[] positions, Class<? extends Key<?>>[] types) {
-		this(positions, types, null);
-	}
-	
-	public RecordComparatorFactory(int[] positions, Class<? extends Key<?>>[] types, boolean[] sortDirections) {
-		if (positions == null || types == null) {
-			throw new NullPointerException();
-		}
-		if (positions.length != types.length) {
-			throw new IllegalArgumentException();
-		}
-		
-		this.positions = positions;
-		this.types = types;
-		
-		if (sortDirections == null) {
-			this.sortDirections = new boolean[positions.length];
-			Arrays.fill(this.sortDirections, true);
-		} else if (sortDirections.length != positions.length) {
-			throw new IllegalArgumentException();
-		} else {
-			this.sortDirections = sortDirections;
-		}
-	}
-
-
-	@Override
-	public void writeParametersToConfig(Configuration config) {
-		for (int i = 0; i < this.positions.length; i++) {
-			if (this.positions[i] < 0) {
-				throw new IllegalArgumentException("The key position " + i + " is invalid: " + this.positions[i]);
-			}
-			if (this.types[i] == null || !Key.class.isAssignableFrom(this.types[i])) {
-				throw new IllegalArgumentException("The key type " + i + " is null or not implenting the interface " + 
-					Key.class.getName() + ".");
-			}
-		}
-		
-		// write the config
-		config.setInteger(NUM_KEYS, this.positions.length);
-		for (int i = 0; i < this.positions.length; i++) {
-			config.setInteger(KEY_POS_PREFIX + i, this.positions[i]);
-			config.setString(KEY_CLASS_PREFIX + i, this.types[i].getName());
-			config.setBoolean(KEY_SORT_DIRECTION_PREFIX + i, this.sortDirections[i]);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
-		// figure out how many key fields there are
-		final int numKeyFields = config.getInteger(NUM_KEYS, -1);
-		if (numKeyFields < 0) {
-			throw new IllegalConfigurationException("The number of keys for the comparator is invalid: " + numKeyFields);
-		}
-		
-		final int[] positions = new int[numKeyFields];
-		final Class<? extends Key<?>>[] types = new Class[numKeyFields];
-		final boolean[] direction = new boolean[numKeyFields];
-		
-		// read the individual key positions and types
-		for (int i = 0; i < numKeyFields; i++) {
-			// next key position
-			final int p = config.getInteger(KEY_POS_PREFIX + i, -1);
-			if (p >= 0) {
-				positions[i] = p;
-			} else {
-				throw new IllegalConfigurationException("Contained invalid position for key no positions for keys.");
-			}
-			
-			// next key type
-			final String name = config.getString(KEY_CLASS_PREFIX + i, null);
-			if (name != null) {
-				types[i] = (Class<? extends Key<?>>) Class.forName(name, true, cl).asSubclass(Key.class);
-			} else {
-				throw new IllegalConfigurationException("The key type (" + i +
-					") for the comparator is null"); 
-			}
-			
-			// next key sort direction
-			direction[i] = config.getBoolean(KEY_SORT_DIRECTION_PREFIX + i, true);
-		}
-		
-		this.positions = positions;
-		this.types = types;
-		this.sortDirections = direction;
-	}
-	
-
-	@Override
-	public RecordComparator createComparator() {
-		return new RecordComparator(this.positions, this.types, this.sortDirections);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparator.java
deleted file mode 100644
index 807814d..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparator.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime.record;
-
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-
-
-/**
- * Implementation of the {@link TypePairComparator} interface for Pact Records. The equality is established on a set of
- * key fields. The indices of the key fields may be different on the reference and candidate side.
- */
-public class RecordPairComparator extends TypePairComparator<Record, Record>  {
-	
-	private final int[] keyFields1, keyFields2;			// arrays with the positions of the keys in the records
-	
-	@SuppressWarnings("rawtypes")
-	private final Key[] keyHolders1, keyHolders2;		// arrays with mutable objects for the key types
-	
-	
-	public RecordPairComparator(int[] keyFieldsReference, int[] keyFieldsCandidate, Class<? extends Key<?>>[] keyTypes) {
-		if (keyFieldsReference.length != keyFieldsCandidate.length || keyFieldsCandidate.length != keyTypes.length) {
-			throw new IllegalArgumentException(
-				"The arrays describing the key positions and types must be of the same length.");
-		}
-		this.keyFields1 = keyFieldsReference;
-		this.keyFields2 = keyFieldsCandidate;
-		
-		// instantiate fields to extract keys into
-		this.keyHolders1 = new Key[keyTypes.length];
-		this.keyHolders2 = new Key[keyTypes.length];
-		
-		for (int i = 0; i < keyTypes.length; i++) {
-			if (keyTypes[i] == null) {
-				throw new NullPointerException("Key type " + i + " is null.");
-			}
-			this.keyHolders1[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
-			this.keyHolders2[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-
-	@Override
-	public void setReference(Record reference) {
-		for (int i = 0; i < this.keyFields1.length; i++) {
-			if (!reference.getFieldInto(this.keyFields1[i], this.keyHolders1[i])) {
-				throw new NullKeyFieldException(this.keyFields1[i]);
-			}
-		}
-	}
-
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public boolean equalToReference(Record candidate) {
-		for (int i = 0; i < this.keyFields2.length; i++) {
-			final Key k = candidate.getField(this.keyFields2[i], this.keyHolders2[i]);
-			if (k == null) {
-				throw new NullKeyFieldException(this.keyFields2[i]);
-			} else if (!k.equals(this.keyHolders1[i])) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public int compareToReference(Record candidate) {
-		for (int i = 0; i < this.keyFields2.length; i++) {
-			final Key k = candidate.getField(this.keyFields2[i], this.keyHolders2[i]);
-			if (k == null) {
-				throw new NullKeyFieldException(this.keyFields2[i]);
-			} else {
-				final int comp = k.compareTo(this.keyHolders1[i]);
-				if (comp != 0) {
-					return comp;
-				}
-			}
-		}
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparatorFactory.java
deleted file mode 100644
index 0df584e..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparatorFactory.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime.record;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-/**
- * A factory for a {@link TypePairComparator} for {@link Record}. The comparator uses a subset of
- * the fields for the comparison. That subset of fields (positions and types) is read from the
- * supplied configuration.
- */
-public class RecordPairComparatorFactory implements TypePairComparatorFactory<Record, Record> {
-	
-	private static final RecordPairComparatorFactory INSTANCE = new RecordPairComparatorFactory();
-	
-	/**
-	 * Gets an instance of the comparator factory. The instance is shared, since the factory is a
-	 * stateless class. 
-	 * 
-	 * @return An instance of the comparator factory.
-	 */
-	public static final RecordPairComparatorFactory get() {
-		return INSTANCE;
-	}
-
-	@Override
-	public TypePairComparator<Record, Record> createComparator12(
-			TypeComparator<Record> comparator1,	TypeComparator<Record> comparator2)
-	{
-		if (!(comparator1 instanceof RecordComparator && comparator2 instanceof RecordComparator)) {
-			throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparators.");
-		}
-		final RecordComparator prc1 = (RecordComparator) comparator1;
-		final RecordComparator prc2 = (RecordComparator) comparator2;
-		
-		final int[] pos1 = prc1.getKeyPositions();
-		final int[] pos2 = prc2.getKeyPositions();
-		
-		final Class<? extends Key<?>>[] types1 = prc1.getKeyTypes();
-		final Class<? extends Key<?>>[] types2 = prc2.getKeyTypes();
-		
-		checkComparators(pos1, pos2, types1, types2);
-		
-		return new RecordPairComparator(pos1, pos2, types1);
-	}
-
-	@Override
-	public TypePairComparator<Record, Record> createComparator21(
-		TypeComparator<Record> comparator1,	TypeComparator<Record> comparator2)
-	{
-		if (!(comparator1 instanceof RecordComparator && comparator2 instanceof RecordComparator)) {
-			throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparators.");
-		}
-		final RecordComparator prc1 = (RecordComparator) comparator1;
-		final RecordComparator prc2 = (RecordComparator) comparator2;
-		
-		final int[] pos1 = prc1.getKeyPositions();
-		final int[] pos2 = prc2.getKeyPositions();
-		
-		final Class<? extends Key<?>>[] types1 = prc1.getKeyTypes();
-		final Class<? extends Key<?>>[] types2 = prc2.getKeyTypes();
-		
-		checkComparators(pos1, pos2, types1, types2);
-		
-		return new RecordPairComparator(pos2, pos1, types1);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	private static final void checkComparators(int[] pos1, int[] pos2, 
-							Class<? extends Key<?>>[] types1, Class<? extends Key<?>>[] types2)
-	{
-		if (pos1.length != pos2.length || types1.length != types2.length) {
-			throw new IllegalArgumentException(
-				"The given pair of RecordComparators does not operate on the same number of fields.");
-		}
-		for (int i = 0; i < types1.length; i++) {
-			if (!types1[i].equals(types2[i])) {
-				throw new IllegalArgumentException(
-				"The given pair of RecordComparators does not operates on different data types.");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordSerializer.java
deleted file mode 100644
index a14e931..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordSerializer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime.record;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Record;
-
-
-/**
- * Implementation of the (de)serialization and copying logic for the {@link Record}.
- */
-public final class RecordSerializer extends TypeSerializer<Record> {
-	
-	private static final long serialVersionUID = 1L;
-
-	private static final RecordSerializer INSTANCE = new RecordSerializer(); // singleton instance
-	
-	private static final int MAX_BIT = 0x80;	// byte where only the most significant bit is set
-	
-	// --------------------------------------------------------------------------------------------
-
-	public static final RecordSerializer get() {
-		return INSTANCE;
-	}
-	
-	/**
-	 * Creates a new instance of the RecordSerializers. Private to prevent instantiation.
-	 */
-	private RecordSerializer() {}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
-	public Record createInstance() {
-		return new Record(); 
-	}
-
-	@Override
-	public Record copy(Record from, Record reuse) {
-		from.copyTo(reuse);
-		return reuse;
-	}
-	
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void serialize(Record record, DataOutputView target) throws IOException {
-		record.serialize(target);
-	}
-
-	@Override
-	public Record deserialize(Record target, DataInputView source) throws IOException {
-		target.deserialize(source);
-		return target;
-	}
-	
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		int val = source.readUnsignedByte();
-		target.writeByte(val);
-		
-		if (val >= MAX_BIT) {
-			int shift = 7;
-			int curr;
-			val = val & 0x7f;
-			while ((curr = source.readUnsignedByte()) >= MAX_BIT) {
-				target.writeByte(curr);
-				val |= (curr & 0x7f) << shift;
-				shift += 7;
-			}
-			target.writeByte(curr);
-			val |= curr << shift;
-		}
-		
-		target.write(source, val);
-	}
-}


Mime
View raw message