Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1C35D1105B for ; Mon, 22 Sep 2014 15:40:38 +0000 (UTC) Received: (qmail 93094 invoked by uid 500); 22 Sep 2014 15:40:38 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 93036 invoked by uid 500); 22 Sep 2014 15:40:38 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 93020 invoked by uid 99); 22 Sep 2014 15:40:37 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Sep 2014 15:40:37 +0000 X-ASF-Spam-Status: No, hits=-2000.8 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 22 Sep 2014 15:40:32 +0000 Received: (qmail 92794 invoked by uid 99); 22 Sep 2014 15:40:12 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Sep 2014 15:40:12 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2C65E9D1236; Mon, 22 Sep 2014 15:40:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Mon, 22 Sep 2014 15:40:13 -0000 Message-Id: <25abd5c8f42f4610a656e99074ffa1c2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] [FLINK-1111] Move Basic and Array Type Information into "flink-core" Project X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java index 6e92d44..465ef83 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java @@ -21,9 +21,9 @@ package org.apache.flink.api.java.operators.translation; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.types.TypeInformation; import org.apache.flink.util.Collector; public class PlanLeftUnwrappingJoinOperator http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java index 4de7311..7a59570 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.types.TypeInformation; public class PlanProjectOperator extends MapOperatorBase> { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java index 894a4a2..eeeb6c4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java @@ -21,9 +21,9 @@ package org.apache.flink.api.java.operators.translation; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.types.TypeInformation; import org.apache.flink.util.Collector; public class PlanRightUnwrappingCoGroupOperator http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java index c2973b7..18d263f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java @@ -21,9 +21,9 @@ package org.apache.flink.api.java.operators.translation; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.types.TypeInformation; import org.apache.flink.util.Collector; public class PlanRightUnwrappingJoinOperator http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index c222ff2..22cbce7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -22,10 +22,10 @@ import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.types.TypeInformation; import org.apache.flink.util.Collector; /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java index 4da981c..54558ae 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java @@ -21,9 +21,9 @@ package org.apache.flink.api.java.operators.translation; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.types.TypeInformation; /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java index 588c910..e4b815b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/FileDataSink.java @@ -27,9 +27,9 @@ import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.FileDataSinkBase; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.typeinfo.NothingTypeInfo; import org.apache.flink.api.java.typeutils.RecordTypeInfo; import org.apache.flink.types.Nothing; -import org.apache.flink.types.NothingTypeInfo; import org.apache.flink.types.Record; /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java index f47b53e..3ea8157 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/GenericDataSink.java @@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GenericDataSinkBase; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.typeinfo.NothingTypeInfo; import org.apache.flink.api.java.typeutils.RecordTypeInfo; import org.apache.flink.types.Nothing; -import org.apache.flink.types.NothingTypeInfo; import org.apache.flink.types.Record; /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AtomicType.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AtomicType.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AtomicType.java deleted file mode 100644 index 9ccda5e..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AtomicType.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.api.common.typeutils.TypeComparator; - - -/** - * - */ -public interface AtomicType { - - TypeComparator createComparator(boolean sortOrderAscending); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicArrayTypeInfo.java deleted file mode 100644 index 2518590..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicArrayTypeInfo.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer; -import org.apache.flink.api.java.functions.InvalidTypesException; -import org.apache.flink.api.java.typeutils.runtime.GenericArraySerializer; -import org.apache.flink.types.TypeInformation; - -public class BasicArrayTypeInfo extends TypeInformation { - - public static final BasicArrayTypeInfo STRING_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(String[].class, BasicTypeInfo.STRING_TYPE_INFO); - - public static final BasicArrayTypeInfo BOOLEAN_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(Boolean[].class, BasicTypeInfo.BOOLEAN_TYPE_INFO); - public static final BasicArrayTypeInfo BYTE_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(Byte[].class, BasicTypeInfo.BYTE_TYPE_INFO); - public static final BasicArrayTypeInfo SHORT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(Short[].class, BasicTypeInfo.SHORT_TYPE_INFO); - public static final BasicArrayTypeInfo INT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(Integer[].class, BasicTypeInfo.INT_TYPE_INFO); - public static final BasicArrayTypeInfo LONG_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(Long[].class, BasicTypeInfo.LONG_TYPE_INFO); - public static final BasicArrayTypeInfo FLOAT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(Float[].class, BasicTypeInfo.FLOAT_TYPE_INFO); - public static final BasicArrayTypeInfo DOUBLE_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(Double[].class, BasicTypeInfo.DOUBLE_TYPE_INFO); - public static final BasicArrayTypeInfo CHAR_ARRAY_TYPE_INFO = new BasicArrayTypeInfo(Character[].class, BasicTypeInfo.CHAR_TYPE_INFO); - - // -------------------------------------------------------------------------------------------- - - private final Class arrayClass; - private final Class componentClass; - private final TypeInformation componentInfo; - - @SuppressWarnings("unchecked") - private BasicArrayTypeInfo(Class arrayClass, BasicTypeInfo componentInfo) { - this.arrayClass = arrayClass; - this.componentClass = (Class) arrayClass.getComponentType(); - this.componentInfo = componentInfo; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public Class getTypeClass() { - return this.arrayClass; - } - - public Class getComponentTypeClass() { - return this.componentClass; - } - - public TypeInformation getComponentInfo() { - return componentInfo; - } - - @Override - public boolean isKeyType() { - return false; - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer createSerializer() { - // special case the string array - if (componentClass.equals(String.class)) { - return (TypeSerializer) StringArraySerializer.INSTANCE; - } else { - return (TypeSerializer) new GenericArraySerializer(this.componentClass, this.componentInfo.createSerializer()); - } - } - - @Override - public String toString() { - return this.getClass().getSimpleName()+"<"+this.componentInfo+">"; - } - - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("unchecked") - public static BasicArrayTypeInfo getInfoFor(Class type) { - if (!type.isArray()) { - throw new InvalidTypesException("The given class is no array."); - } - - // basic type arrays - return (BasicArrayTypeInfo) TYPES.get(type); - } - - private static final Map, BasicArrayTypeInfo> TYPES = new HashMap, BasicArrayTypeInfo>(); - - static { - TYPES.put(String[].class, STRING_ARRAY_TYPE_INFO); - TYPES.put(Boolean[].class, BOOLEAN_ARRAY_TYPE_INFO); - TYPES.put(Byte[].class, BYTE_ARRAY_TYPE_INFO); - TYPES.put(Short[].class, SHORT_ARRAY_TYPE_INFO); - TYPES.put(Integer[].class, INT_ARRAY_TYPE_INFO); - TYPES.put(Long[].class, LONG_ARRAY_TYPE_INFO); - TYPES.put(Float[].class, FLOAT_ARRAY_TYPE_INFO); - TYPES.put(Double[].class, DOUBLE_ARRAY_TYPE_INFO); - TYPES.put(Character[].class, CHAR_ARRAY_TYPE_INFO); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicTypeInfo.java deleted file mode 100644 index d54c18f..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/BasicTypeInfo.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.BooleanComparator; -import org.apache.flink.api.common.typeutils.base.BooleanSerializer; -import org.apache.flink.api.common.typeutils.base.ByteComparator; -import org.apache.flink.api.common.typeutils.base.ByteSerializer; -import org.apache.flink.api.common.typeutils.base.CharComparator; -import org.apache.flink.api.common.typeutils.base.CharSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.FloatComparator; -import org.apache.flink.api.common.typeutils.base.FloatSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.ShortComparator; -import org.apache.flink.api.common.typeutils.base.ShortSerializer; -import org.apache.flink.api.common.typeutils.base.StringComparator; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.types.TypeInformation; - - -/** - * - */ -public class BasicTypeInfo extends TypeInformation implements AtomicType { - - public static final BasicTypeInfo STRING_TYPE_INFO = new BasicTypeInfo(String.class, StringSerializer.INSTANCE, StringComparator.class); - public static final BasicTypeInfo BOOLEAN_TYPE_INFO = new BasicTypeInfo(Boolean.class, BooleanSerializer.INSTANCE, BooleanComparator.class); - public static final BasicTypeInfo BYTE_TYPE_INFO = new BasicTypeInfo(Byte.class, ByteSerializer.INSTANCE, ByteComparator.class); - public static final BasicTypeInfo SHORT_TYPE_INFO = new BasicTypeInfo(Short.class, ShortSerializer.INSTANCE, ShortComparator.class); - public static final BasicTypeInfo INT_TYPE_INFO = new BasicTypeInfo(Integer.class, IntSerializer.INSTANCE, IntComparator.class); - public static final BasicTypeInfo LONG_TYPE_INFO = new BasicTypeInfo(Long.class, LongSerializer.INSTANCE, LongComparator.class); - public static final BasicTypeInfo FLOAT_TYPE_INFO = new BasicTypeInfo(Float.class, FloatSerializer.INSTANCE, FloatComparator.class); - public static final BasicTypeInfo DOUBLE_TYPE_INFO = new BasicTypeInfo(Double.class, DoubleSerializer.INSTANCE, DoubleComparator.class); - public static final BasicTypeInfo CHAR_TYPE_INFO = new BasicTypeInfo(Character.class, CharSerializer.INSTANCE, CharComparator.class); - - // -------------------------------------------------------------------------------------------- - - private final Class clazz; - - private final TypeSerializer serializer; - - private final Class> comparatorClass; - - - private BasicTypeInfo(Class clazz, TypeSerializer serializer, Class> comparatorClass) { - this.clazz = clazz; - this.serializer = serializer; - this.comparatorClass = comparatorClass; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isBasicType() { - return true; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public Class getTypeClass() { - return this.clazz; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - public TypeSerializer createSerializer() { - return this.serializer; - } - - @Override - public TypeComparator createComparator(boolean sortOrderAscending) { - return instantiateComparator(comparatorClass, sortOrderAscending); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return this.clazz.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof BasicTypeInfo) { - @SuppressWarnings("unchecked") - BasicTypeInfo other = (BasicTypeInfo) obj; - return this.clazz.equals(other.clazz); - } else { - return false; - } - } - - @Override - public String toString() { - return clazz.getSimpleName(); - } - - // -------------------------------------------------------------------------------------------- - - public static BasicTypeInfo getInfoFor(Class type) { - if (type == null) { - throw new NullPointerException(); - } - - @SuppressWarnings("unchecked") - BasicTypeInfo info = (BasicTypeInfo) TYPES.get(type); - return info; - } - - private static TypeComparator instantiateComparator(Class> comparatorClass, boolean ascendingOrder) { - try { - Constructor> constructor = comparatorClass.getConstructor(boolean.class); - return constructor.newInstance(ascendingOrder); - } - catch (Exception e) { - throw new RuntimeException("Could not initialize basic comparator " + comparatorClass.getName(), e); - } - } - - private static final Map, BasicTypeInfo> TYPES = new HashMap, BasicTypeInfo>(); - - static { - TYPES.put(String.class, STRING_TYPE_INFO); - TYPES.put(Boolean.class, BOOLEAN_TYPE_INFO); - TYPES.put(boolean.class, BOOLEAN_TYPE_INFO); - TYPES.put(Byte.class, BYTE_TYPE_INFO); - TYPES.put(byte.class, BYTE_TYPE_INFO); - TYPES.put(Short.class, SHORT_TYPE_INFO); - TYPES.put(short.class, SHORT_TYPE_INFO); - TYPES.put(Integer.class, INT_TYPE_INFO); - TYPES.put(int.class, INT_TYPE_INFO); - TYPES.put(Long.class, LONG_TYPE_INFO); - TYPES.put(long.class, LONG_TYPE_INFO); - TYPES.put(Float.class, FLOAT_TYPE_INFO); - TYPES.put(float.class, FLOAT_TYPE_INFO); - TYPES.put(Double.class, DOUBLE_TYPE_INFO); - TYPES.put(double.class, DOUBLE_TYPE_INFO); - TYPES.put(Character.class, CHAR_TYPE_INFO); - TYPES.put(char.class, CHAR_TYPE_INFO); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/CompositeType.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/CompositeType.java deleted file mode 100644 index 579eb39..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/CompositeType.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.api.common.typeutils.TypeComparator; - - -/** - * - */ -public interface CompositeType { - - TypeComparator createComparator(int[] logicalKeyFields, boolean[] orders); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java index cbc36ed..6cbefed 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -18,11 +18,12 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator; -import org.apache.flink.types.TypeInformation; /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java index b14f16d..d3a70a6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java @@ -19,7 +19,7 @@ package org.apache.flink.api.java.typeutils; -import org.apache.flink.types.TypeInformation; +import org.apache.flink.api.common.typeinfo.TypeInformation; /** * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this interface to be configured http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java index 0802280..e770898 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java @@ -22,11 +22,12 @@ import java.lang.reflect.GenericArrayType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.functions.InvalidTypesException; +import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.runtime.GenericArraySerializer; -import org.apache.flink.types.TypeInformation; public class ObjectArrayTypeInfo extends TypeInformation { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java index f30be89..105a275 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java @@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils; import java.lang.reflect.Field; -import org.apache.flink.types.TypeInformation; +import org.apache.flink.api.common.typeinfo.TypeInformation; class PojoField { public Field field; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 9fddede..efdf152 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -26,11 +26,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.CompositeType; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.PojoComparator; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; -import org.apache.flink.types.TypeInformation; /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PrimitiveArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PrimitiveArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PrimitiveArrayTypeInfo.java deleted file mode 100644 index fc4dea8..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PrimitiveArrayTypeInfo.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer; -import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer; -import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArraySerializer; -import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySerializer; -import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer; -import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer; -import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer; -import org.apache.flink.api.java.functions.InvalidTypesException; -import org.apache.flink.types.TypeInformation; - -public class PrimitiveArrayTypeInfo extends TypeInformation { - - public static final PrimitiveArrayTypeInfo BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(byte[].class, BytePrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(short[].class, ShortPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(int[].class, IntPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(long[].class, LongPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(float[].class, FloatPrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(double[].class, DoublePrimitiveArraySerializer.INSTANCE); - public static final PrimitiveArrayTypeInfo CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo(char[].class, CharPrimitiveArraySerializer.INSTANCE); - - // -------------------------------------------------------------------------------------------- - - private final Class arrayClass; - private final TypeSerializer serializer; - - private PrimitiveArrayTypeInfo(Class arrayClass, TypeSerializer serializer) { - this.arrayClass = arrayClass; - this.serializer = serializer; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public Class getTypeClass() { - return this.arrayClass; - } - - @Override - public boolean isKeyType() { - return false; - } - - @Override - public TypeSerializer createSerializer() { - return this.serializer; - } - - @Override - public String toString() { - return arrayClass.getComponentType().getName() + "[]"; - } - - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("unchecked") - public static PrimitiveArrayTypeInfo getInfoFor(Class type) { - if (!type.isArray()) { - throw new InvalidTypesException("The given class is no array."); - } - - // basic type arrays - return (PrimitiveArrayTypeInfo) TYPES.get(type); - } - - private static final Map, PrimitiveArrayTypeInfo> TYPES = new HashMap, PrimitiveArrayTypeInfo>(); - - static { - TYPES.put(boolean[].class, BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO); - TYPES.put(byte[].class, BYTE_PRIMITIVE_ARRAY_TYPE_INFO); - TYPES.put(short[].class, SHORT_PRIMITIVE_ARRAY_TYPE_INFO); - TYPES.put(int[].class, INT_PRIMITIVE_ARRAY_TYPE_INFO); - TYPES.put(long[].class, LONG_PRIMITIVE_ARRAY_TYPE_INFO); - TYPES.put(float[].class, FLOAT_PRIMITIVE_ARRAY_TYPE_INFO); - TYPES.put(double[].class, DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO); - TYPES.put(char[].class, CHAR_PRIMITIVE_ARRAY_TYPE_INFO); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java index 980b0f6..445fef9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java @@ -18,10 +18,10 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializer; +import org.apache.flink.api.common.typeutils.record.RecordSerializer; import org.apache.flink.types.Record; -import org.apache.flink.types.TypeInformation; /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java index 415a026..f20060e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.typeutils; -import org.apache.flink.types.TypeInformation; +import org.apache.flink.api.common.typeinfo.TypeInformation; /** * This interface can be implemented by functions and input formats to tell the framework http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java index 25be7f1..1f0d1cd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -20,11 +20,13 @@ package org.apache.flink.api.java.typeutils; import java.util.Arrays; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.TupleComparator; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; -import org.apache.flink.types.TypeInformation; //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator import org.apache.flink.api.java.tuple.*; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index bca29dc..7701a1d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -20,7 +20,8 @@ package org.apache.flink.api.java.typeutils; import java.util.Arrays; -import org.apache.flink.types.TypeInformation; +import org.apache.flink.api.common.typeinfo.CompositeType; +import org.apache.flink.api.common.typeinfo.TypeInformation; public abstract class TupleTypeInfoBase extends TypeInformation implements CompositeType { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index a8a833f..3bc838c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -36,16 +36,19 @@ import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.java.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.types.TypeInformation; import org.apache.flink.types.Value; import org.apache.hadoop.io.Writable; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java index c73c79f..469b4ec 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java @@ -22,7 +22,10 @@ package org.apache.flink.api.java.typeutils; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.flink.types.TypeInformation; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.types.Value; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java index 0375af6..cf43988 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java @@ -18,15 +18,16 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.functions.InvalidTypesException; import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator; import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer; import org.apache.flink.api.java.typeutils.runtime.ValueComparator; import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; import org.apache.flink.types.CopyableValue; -import org.apache.flink.types.TypeInformation; import org.apache.flink.types.Value; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java index a2e241f..d21c371 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java @@ -18,12 +18,13 @@ package org.apache.flink.api.java.typeutils; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.functions.InvalidTypesException; import org.apache.flink.api.java.typeutils.runtime.WritableComparator; import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; -import org.apache.flink.types.TypeInformation; import org.apache.hadoop.io.Writable; public class WritableTypeInfo extends TypeInformation implements AtomicType { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java index 6911580..7969740 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java @@ -43,9 +43,9 @@ public class CopyableValueComparator & Comparable> private transient T tempReference; - private final Comparable[] extractedKey = new Comparable[1]; + private final Comparable[] extractedKey = new Comparable[1]; - private final TypeComparator[] comparators = new TypeComparator[] {this}; + private final TypeComparator[] comparators = new TypeComparator[] {this}; public CopyableValueComparator(boolean ascending, Class type) { this.type = type; @@ -132,7 +132,7 @@ public class CopyableValueComparator & Comparable> } @Override - public TypeComparator[] getComparators() { + public TypeComparator[] getComparators() { return comparators; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializer.java deleted file mode 100644 index e67334d..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializer.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; -import java.lang.reflect.Array; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - - -/** - * @param The component type - */ -public class GenericArraySerializer extends TypeSerializer { - - private static final long serialVersionUID = 1L; - - private final Class componentClass; - - private final TypeSerializer componentSerializer; - - private final C[] EMPTY; - - - - public GenericArraySerializer(Class componentClass, TypeSerializer componentSerializer) { - if (componentClass == null || componentSerializer == null) { - throw new NullPointerException(); - } - - this.componentClass = componentClass; - this.componentSerializer = componentSerializer; - this.EMPTY = create(0); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public boolean isStateful() { - return this.componentSerializer.isStateful(); - } - - - @Override - public C[] createInstance() { - return EMPTY; - } - - @Override - public C[] copy(C[] from, C[] reuse) { - C[] copy = create(from.length); - - for (int i = 0; i < copy.length; i++) { - copy[i] = this.componentSerializer.copy(from[i], this.componentSerializer.createInstance()); - } - - return copy; - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(C[] value, DataOutputView target) throws IOException { - target.writeInt(value.length); - for (int i = 0; i < value.length; i++) { - C val = value[i]; - if (val == null) { - target.writeBoolean(false); - } else { - target.writeBoolean(true); - componentSerializer.serialize(val, target); - } - } - } - - @Override - public C[] deserialize(C[] reuse, DataInputView source) throws IOException { - int len = source.readInt(); - - if (reuse.length != len) { - reuse = create(len); - } - - for (int i = 0; i < len; i++) { - boolean isNonNull = source.readBoolean(); - if (isNonNull) { - reuse[i] = componentSerializer.deserialize(componentSerializer.createInstance(), source); - } else { - reuse[i] = null; - } - } - - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - int len = source.readInt(); - target.writeInt(len); - - for (int i = 0; i < len; i++) { - boolean isNonNull = source.readBoolean(); - target.writeBoolean(isNonNull); - - if (isNonNull) { - componentSerializer.copy(source, target); - } - } - } - - @SuppressWarnings("unchecked") - private final C[] create(int len) { - return (C[]) Array.newInstance(componentClass, len); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return componentClass.hashCode() + componentSerializer.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof GenericArraySerializer) { - GenericArraySerializer other = (GenericArraySerializer) obj; - return this.componentClass == other.componentClass && - this.componentSerializer.equals(other.componentSerializer); - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java deleted file mode 100644 index bf8d56c..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java +++ /dev/null @@ -1,422 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime.record; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.types.Key; -import org.apache.flink.types.KeyFieldOutOfBoundsException; -import org.apache.flink.types.NormalizableKey; -import org.apache.flink.types.NullKeyFieldException; -import org.apache.flink.types.Record; -import org.apache.flink.util.InstantiationUtil; - - -/** - * Implementation of the {@link TypeComparator} interface for the pact record. Instances of this class - * are parameterized with which fields are relevant to the comparison. - */ -public final class RecordComparator extends TypeComparator { - - private static final long serialVersionUID = 1L; - - /** - * A sequence of prime numbers to be used for salting the computed hash values. - * Based on some empirical evidence, we are using a 32-element subsequence of the - * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime). - * - * @see: http://en.wikipedia.org/wiki/List_of_prime_numbers - * @see: http://oeis.org/A068652 - */ - private static final int[] HASH_SALT = new int[] { - 73 , 79 , 97 , 113 , 131 , 197 , 199 , 311 , - 337 , 373 , 719 , 733 , 919 , 971 , 991 , 1193 , - 1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , - 19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 }; - - private final int[] keyFields; - - @SuppressWarnings("rawtypes") - private final Key[] keyHolders, transientKeyHolders; - - private final Record temp1, temp2; - - private final boolean[] ascending; - - private final int[] normalizedKeyLengths; - - private final int numLeadingNormalizableKeys; - - private final int normalizableKeyPrefixLen; - - - /** - * Creates a new comparator that compares Pact Records by the subset of fields as described - * by the given key positions and types. All order comparisons will assume ascending order on all fields. - * - * @param keyFields The positions of the key fields. - * @param keyTypes The types (classes) of the key fields. - */ - public RecordComparator(int[] keyFields, Class>[] keyTypes) { - this(keyFields, keyTypes, null); - } - - /** - * Creates a new comparator that compares Pact Records by the subset of fields as described - * by the given key positions and types. - * - * @param keyFields The positions of the key fields. - * @param keyTypes The types (classes) of the key fields. - * @param sortDirection The direction for sorting. A value of true indicates ascending for an attribute, - * a value of false indicated descending. If the parameter is null, then - * all order comparisons will assume ascending order on all fields. - */ - public RecordComparator(int[] keyFields, Class>[] keyTypes, boolean[] sortDirection) { - this.keyFields = keyFields; - - // instantiate fields to extract keys into - this.keyHolders = new Key[keyTypes.length]; - this.transientKeyHolders = new Key[keyTypes.length]; - for (int i = 0; i < keyTypes.length; i++) { - if (keyTypes[i] == null) { - throw new NullPointerException("Key type " + i + " is null."); - } - this.keyHolders[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class); - this.transientKeyHolders[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class); - } - - // set up auxiliary fields for normalized key support - this.normalizedKeyLengths = new int[keyFields.length]; - int nKeys = 0; - int nKeyLen = 0; - boolean inverted = false; - for (int i = 0; i < this.keyHolders.length; i++) { - Key k = this.keyHolders[i]; - if (k instanceof NormalizableKey) { - if (sortDirection != null) { - if (sortDirection[i] && inverted) { - break; - } else if (i == 0 && !sortDirection[0]) { - inverted = true; - } - } - nKeys++; - final int len = ((NormalizableKey) k).getMaxNormalizedKeyLen(); - if (len < 0) { - throw new RuntimeException("Data type " + k.getClass().getName() + - " specifies an invalid length for the normalized key: " + len); - } - this.normalizedKeyLengths[i] = len; - nKeyLen += this.normalizedKeyLengths[i]; - if (nKeyLen < 0) { - nKeyLen = Integer.MAX_VALUE; - break; - } - } else { - break; - } - } - this.numLeadingNormalizableKeys = nKeys; - this.normalizableKeyPrefixLen = nKeyLen; - - this.temp1 = new Record(); - this.temp2 = new Record(); - - if (sortDirection != null) { - this.ascending = sortDirection; - } else { - this.ascending = new boolean[keyFields.length]; - for (int i = 0; i < this.ascending.length; i++) { - this.ascending[i] = true; - } - } - } - - /** - * Copy constructor. - * - * @param toCopy Comparator to copy. - */ - private RecordComparator(RecordComparator toCopy) { - this.keyFields = toCopy.keyFields; - this.keyHolders = new Key[toCopy.keyHolders.length]; - this.transientKeyHolders = new Key[toCopy.keyHolders.length]; - - try { - for (int i = 0; i < this.keyHolders.length; i++) { - this.keyHolders[i] = toCopy.keyHolders[i].getClass().newInstance(); - this.transientKeyHolders[i] = toCopy.keyHolders[i].getClass().newInstance(); - } - } catch (Exception ex) { - // this should never happen, because the classes have been instantiated before. Report for debugging. - throw new RuntimeException("Could not instantiate key classes when duplicating RecordComparator.", ex); - } - - this.normalizedKeyLengths = toCopy.normalizedKeyLengths; - this.numLeadingNormalizableKeys = toCopy.numLeadingNormalizableKeys; - this.normalizableKeyPrefixLen = toCopy.normalizableKeyPrefixLen; - this.ascending = toCopy.ascending; - - this.temp1 = new Record(); - this.temp2 = new Record(); - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public int hash(Record object) { - int i = 0; - try { - int code = 0; - for (; i < this.keyFields.length; i++) { - code ^= object.getField(this.keyFields[i], this.transientKeyHolders[i]).hashCode(); - code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component - } - return code; - } - catch (NullPointerException npex) { - throw new NullKeyFieldException(this.keyFields[i]); - } - catch (IndexOutOfBoundsException iobex) { - throw new KeyFieldOutOfBoundsException(this.keyFields[i]); - } - } - - - @Override - public void setReference(Record toCompare) { - for (int i = 0; i < this.keyFields.length; i++) { - if (!toCompare.getFieldInto(this.keyFields[i], this.keyHolders[i])) { - throw new NullKeyFieldException(this.keyFields[i]); - } - } - } - - - @Override - public boolean equalToReference(Record candidate) { - for (int i = 0; i < this.keyFields.length; i++) { - final Key k = candidate.getField(this.keyFields[i], this.transientKeyHolders[i]); - if (k == null) { - throw new NullKeyFieldException(this.keyFields[i]); - } else if (!k.equals(this.keyHolders[i])) { - return false; - } - } - return true; - } - - - @Override - public int compareToReference(TypeComparator referencedAccessors) { - final RecordComparator pra = (RecordComparator) referencedAccessors; - - for (int i = 0; i < this.keyFields.length; i++) { - @SuppressWarnings("unchecked") - final int comp = pra.keyHolders[i].compareTo(this.keyHolders[i]); - if (comp != 0) { - return this.ascending[i] ? comp : -comp; - } - } - return 0; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public int compare(Record first, Record second) { - int i = 0; - try { - for (; i < this.keyFields.length; i++) { - Key k1 = first.getField(this.keyFields[i], this.keyHolders[i]); - Key k2 = second.getField(this.keyFields[i], this.transientKeyHolders[i]); - int cmp = k1.compareTo(k2); - if (cmp != 0) { - return cmp; - } - } - return 0; - } - catch (NullPointerException e) { - throw new NullKeyFieldException(this.keyFields[i]); - } - } - - @Override - public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException { - this.temp1.read(source1); - this.temp2.read(source2); - - for (int i = 0; i < this.keyFields.length; i++) { - @SuppressWarnings("rawtypes") - final Key k1 = this.temp1.getField(this.keyFields[i], this.keyHolders[i]); - @SuppressWarnings("rawtypes") - final Key k2 = this.temp2.getField(this.keyFields[i], this.transientKeyHolders[i]); - - if (k1 == null || k2 == null) { - throw new NullKeyFieldException(this.keyFields[i]); - } - - @SuppressWarnings("unchecked") - final int comp = k1.compareTo(k2); - if (comp != 0) { - return this.ascending[i] ? comp : -comp; - } - } - return 0; - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public boolean supportsNormalizedKey() { - return this.numLeadingNormalizableKeys > 0; - } - - - @Override - public int getNormalizeKeyLen() { - return this.normalizableKeyPrefixLen; - } - - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return this.numLeadingNormalizableKeys < this.keyFields.length || - this.normalizableKeyPrefixLen == Integer.MAX_VALUE || - this.normalizableKeyPrefixLen > keyBytes; - } - - @Override - public void putNormalizedKey(Record record, MemorySegment target, int offset, int numBytes) { - int i = 0; - try { - for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++) - { - int len = this.normalizedKeyLengths[i]; - len = numBytes >= len ? len : numBytes; - ((NormalizableKey) record.getField(this.keyFields[i], this.transientKeyHolders[i])).copyNormalizedKey(target, offset, len); - numBytes -= len; - offset += len; - } - } - catch (NullPointerException npex) { - throw new NullKeyFieldException(this.keyFields[i]); - } - } - - - @Override - public boolean invertNormalizedKey() { - return !this.ascending[0]; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public void writeWithKeyNormalization(Record record, DataOutputView target) { - throw new UnsupportedOperationException(); - } - - @Override - public Record readWithKeyDenormalization(Record reuse, DataInputView source) { - throw new UnsupportedOperationException(); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public RecordComparator duplicate() { - return new RecordComparator(this); - } - - // -------------------------------------------------------------------------------------------- - // Non Standard Comparator Methods - // -------------------------------------------------------------------------------------------- - - public final int[] getKeyPositions() { - return this.keyFields; - } - - @SuppressWarnings("unchecked") - public final Class>[] getKeyTypes() { - final Class>[] keyTypes = new Class[this.keyHolders.length]; - for (int i = 0; i < keyTypes.length; i++) { - keyTypes[i] = (Class>) this.keyHolders[i].getClass(); - } - return keyTypes; - } - - public final Key[] getKeysAsCopy(Record record) { - try { - final Key[] keys = new Key[this.keyFields.length]; - for (int i = 0; i < keys.length; i++) { - keys[i] = this.keyHolders[i].getClass().newInstance(); - } - if(!record.getFieldsInto(this.keyFields, keys)) { - throw new RuntimeException("Could not extract keys from record."); - } - return keys; - } catch (Exception ex) { - // this should never happen, because the classes have been instantiated before. Report for debugging. - throw new RuntimeException("Could not instantiate key classes when duplicating RecordComparator.", ex); - } - } - - @Override - public Object[] extractKeys(Record record) { - throw new UnsupportedOperationException("Record does not support extactKeys and " + - "getComparators. This cannot be used with the GenericPairComparator."); - } - - @Override - public TypeComparator[] getComparators() { - throw new UnsupportedOperationException("Record does not support extactKeys and " + - "getComparators. This cannot be used with the GenericPairComparator."); - } - - @Override - public boolean supportsCompareAgainstReference() { - return true; - } - - @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public final int compareAgainstReference(Comparable[] keys) { - for (int i = 0; i < this.keyFields.length; i++) - { - final int comp = keys[i].compareTo(this.keyHolders[i]); - if (comp != 0) { - return this.ascending[i] ? comp : -comp; - } - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparatorFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparatorFactory.java deleted file mode 100644 index 5a30969..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparatorFactory.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime.record; - -import java.util.Arrays; - -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.types.Key; -import org.apache.flink.types.Record; - -/** - * A factory for a {@link org.apache.flink.api.common.typeutils.TypeComparator} for {@link Record}. The comparator uses a subset of - * the fields for the comparison. That subset of fields (positions and types) is read from the - * supplied configuration. - */ -public class RecordComparatorFactory implements TypeComparatorFactory { - - private static final String NUM_KEYS = "numkeys"; - - private static final String KEY_POS_PREFIX = "keypos."; - - private static final String KEY_CLASS_PREFIX = "keyclass."; - - private static final String KEY_SORT_DIRECTION_PREFIX = "key-direction."; - - // -------------------------------------------------------------------------------------------- - - private int[] positions; - - private Class>[] types; - - private boolean[] sortDirections; - - // -------------------------------------------------------------------------------------------- - - public RecordComparatorFactory() { - // do nothing, allow to be configured via config - } - - public RecordComparatorFactory(int[] positions, Class>[] types) { - this(positions, types, null); - } - - public RecordComparatorFactory(int[] positions, Class>[] types, boolean[] sortDirections) { - if (positions == null || types == null) { - throw new NullPointerException(); - } - if (positions.length != types.length) { - throw new IllegalArgumentException(); - } - - this.positions = positions; - this.types = types; - - if (sortDirections == null) { - this.sortDirections = new boolean[positions.length]; - Arrays.fill(this.sortDirections, true); - } else if (sortDirections.length != positions.length) { - throw new IllegalArgumentException(); - } else { - this.sortDirections = sortDirections; - } - } - - - @Override - public void writeParametersToConfig(Configuration config) { - for (int i = 0; i < this.positions.length; i++) { - if (this.positions[i] < 0) { - throw new IllegalArgumentException("The key position " + i + " is invalid: " + this.positions[i]); - } - if (this.types[i] == null || !Key.class.isAssignableFrom(this.types[i])) { - throw new IllegalArgumentException("The key type " + i + " is null or not implenting the interface " + - Key.class.getName() + "."); - } - } - - // write the config - config.setInteger(NUM_KEYS, this.positions.length); - for (int i = 0; i < this.positions.length; i++) { - config.setInteger(KEY_POS_PREFIX + i, this.positions[i]); - config.setString(KEY_CLASS_PREFIX + i, this.types[i].getName()); - config.setBoolean(KEY_SORT_DIRECTION_PREFIX + i, this.sortDirections[i]); - } - } - - @SuppressWarnings("unchecked") - @Override - public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { - // figure out how many key fields there are - final int numKeyFields = config.getInteger(NUM_KEYS, -1); - if (numKeyFields < 0) { - throw new IllegalConfigurationException("The number of keys for the comparator is invalid: " + numKeyFields); - } - - final int[] positions = new int[numKeyFields]; - final Class>[] types = new Class[numKeyFields]; - final boolean[] direction = new boolean[numKeyFields]; - - // read the individual key positions and types - for (int i = 0; i < numKeyFields; i++) { - // next key position - final int p = config.getInteger(KEY_POS_PREFIX + i, -1); - if (p >= 0) { - positions[i] = p; - } else { - throw new IllegalConfigurationException("Contained invalid position for key no positions for keys."); - } - - // next key type - final String name = config.getString(KEY_CLASS_PREFIX + i, null); - if (name != null) { - types[i] = (Class>) Class.forName(name, true, cl).asSubclass(Key.class); - } else { - throw new IllegalConfigurationException("The key type (" + i + - ") for the comparator is null"); - } - - // next key sort direction - direction[i] = config.getBoolean(KEY_SORT_DIRECTION_PREFIX + i, true); - } - - this.positions = positions; - this.types = types; - this.sortDirections = direction; - } - - - @Override - public RecordComparator createComparator() { - return new RecordComparator(this.positions, this.types, this.sortDirections); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparator.java deleted file mode 100644 index 807814d..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparator.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime.record; - -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.types.Key; -import org.apache.flink.types.NullKeyFieldException; -import org.apache.flink.types.Record; -import org.apache.flink.util.InstantiationUtil; - - -/** - * Implementation of the {@link TypePairComparator} interface for Pact Records. The equality is established on a set of - * key fields. The indices of the key fields may be different on the reference and candidate side. - */ -public class RecordPairComparator extends TypePairComparator { - - private final int[] keyFields1, keyFields2; // arrays with the positions of the keys in the records - - @SuppressWarnings("rawtypes") - private final Key[] keyHolders1, keyHolders2; // arrays with mutable objects for the key types - - - public RecordPairComparator(int[] keyFieldsReference, int[] keyFieldsCandidate, Class>[] keyTypes) { - if (keyFieldsReference.length != keyFieldsCandidate.length || keyFieldsCandidate.length != keyTypes.length) { - throw new IllegalArgumentException( - "The arrays describing the key positions and types must be of the same length."); - } - this.keyFields1 = keyFieldsReference; - this.keyFields2 = keyFieldsCandidate; - - // instantiate fields to extract keys into - this.keyHolders1 = new Key[keyTypes.length]; - this.keyHolders2 = new Key[keyTypes.length]; - - for (int i = 0; i < keyTypes.length; i++) { - if (keyTypes[i] == null) { - throw new NullPointerException("Key type " + i + " is null."); - } - this.keyHolders1[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class); - this.keyHolders2[i] = InstantiationUtil.instantiate(keyTypes[i], Key.class); - } - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public void setReference(Record reference) { - for (int i = 0; i < this.keyFields1.length; i++) { - if (!reference.getFieldInto(this.keyFields1[i], this.keyHolders1[i])) { - throw new NullKeyFieldException(this.keyFields1[i]); - } - } - } - - - @SuppressWarnings("rawtypes") - @Override - public boolean equalToReference(Record candidate) { - for (int i = 0; i < this.keyFields2.length; i++) { - final Key k = candidate.getField(this.keyFields2[i], this.keyHolders2[i]); - if (k == null) { - throw new NullKeyFieldException(this.keyFields2[i]); - } else if (!k.equals(this.keyHolders1[i])) { - return false; - } - } - return true; - } - - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public int compareToReference(Record candidate) { - for (int i = 0; i < this.keyFields2.length; i++) { - final Key k = candidate.getField(this.keyFields2[i], this.keyHolders2[i]); - if (k == null) { - throw new NullKeyFieldException(this.keyFields2[i]); - } else { - final int comp = k.compareTo(this.keyHolders1[i]); - if (comp != 0) { - return comp; - } - } - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparatorFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparatorFactory.java deleted file mode 100644 index 0df584e..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordPairComparatorFactory.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime.record; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; -import org.apache.flink.types.Key; -import org.apache.flink.types.Record; - -/** - * A factory for a {@link TypePairComparator} for {@link Record}. The comparator uses a subset of - * the fields for the comparison. That subset of fields (positions and types) is read from the - * supplied configuration. - */ -public class RecordPairComparatorFactory implements TypePairComparatorFactory { - - private static final RecordPairComparatorFactory INSTANCE = new RecordPairComparatorFactory(); - - /** - * Gets an instance of the comparator factory. The instance is shared, since the factory is a - * stateless class. - * - * @return An instance of the comparator factory. - */ - public static final RecordPairComparatorFactory get() { - return INSTANCE; - } - - @Override - public TypePairComparator createComparator12( - TypeComparator comparator1, TypeComparator comparator2) - { - if (!(comparator1 instanceof RecordComparator && comparator2 instanceof RecordComparator)) { - throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparators."); - } - final RecordComparator prc1 = (RecordComparator) comparator1; - final RecordComparator prc2 = (RecordComparator) comparator2; - - final int[] pos1 = prc1.getKeyPositions(); - final int[] pos2 = prc2.getKeyPositions(); - - final Class>[] types1 = prc1.getKeyTypes(); - final Class>[] types2 = prc2.getKeyTypes(); - - checkComparators(pos1, pos2, types1, types2); - - return new RecordPairComparator(pos1, pos2, types1); - } - - @Override - public TypePairComparator createComparator21( - TypeComparator comparator1, TypeComparator comparator2) - { - if (!(comparator1 instanceof RecordComparator && comparator2 instanceof RecordComparator)) { - throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparators."); - } - final RecordComparator prc1 = (RecordComparator) comparator1; - final RecordComparator prc2 = (RecordComparator) comparator2; - - final int[] pos1 = prc1.getKeyPositions(); - final int[] pos2 = prc2.getKeyPositions(); - - final Class>[] types1 = prc1.getKeyTypes(); - final Class>[] types2 = prc2.getKeyTypes(); - - checkComparators(pos1, pos2, types1, types2); - - return new RecordPairComparator(pos2, pos1, types1); - } - - // -------------------------------------------------------------------------------------------- - - private static final void checkComparators(int[] pos1, int[] pos2, - Class>[] types1, Class>[] types2) - { - if (pos1.length != pos2.length || types1.length != types2.length) { - throw new IllegalArgumentException( - "The given pair of RecordComparators does not operate on the same number of fields."); - } - for (int i = 0; i < types1.length; i++) { - if (!types1[i].equals(types2[i])) { - throw new IllegalArgumentException( - "The given pair of RecordComparators does not operates on different data types."); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordSerializer.java deleted file mode 100644 index a14e931..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordSerializer.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime.record; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Record; - - -/** - * Implementation of the (de)serialization and copying logic for the {@link Record}. - */ -public final class RecordSerializer extends TypeSerializer { - - private static final long serialVersionUID = 1L; - - private static final RecordSerializer INSTANCE = new RecordSerializer(); // singleton instance - - private static final int MAX_BIT = 0x80; // byte where only the most significant bit is set - - // -------------------------------------------------------------------------------------------- - - public static final RecordSerializer get() { - return INSTANCE; - } - - /** - * Creates a new instance of the RecordSerializers. Private to prevent instantiation. - */ - private RecordSerializer() {} - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public boolean isStateful() { - return false; - } - - @Override - public Record createInstance() { - return new Record(); - } - - @Override - public Record copy(Record from, Record reuse) { - from.copyTo(reuse); - return reuse; - } - - - @Override - public int getLength() { - return -1; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void serialize(Record record, DataOutputView target) throws IOException { - record.serialize(target); - } - - @Override - public Record deserialize(Record target, DataInputView source) throws IOException { - target.deserialize(source); - return target; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - int val = source.readUnsignedByte(); - target.writeByte(val); - - if (val >= MAX_BIT) { - int shift = 7; - int curr; - val = val & 0x7f; - while ((curr = source.readUnsignedByte()) >= MAX_BIT) { - target.writeByte(curr); - val |= (curr & 0x7f) << shift; - shift += 7; - } - target.writeByte(curr); - val |= curr << shift; - } - - target.write(source, val); - } -}