Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 620E518A67 for ; Tue, 2 Feb 2016 17:23:10 +0000 (UTC) Received: (qmail 95668 invoked by uid 500); 2 Feb 2016 17:23:04 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 95479 invoked by uid 500); 2 Feb 2016 17:23:03 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 94124 invoked by uid 99); 2 Feb 2016 17:23:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Feb 2016 17:23:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0BEA2E0C09; Tue, 2 Feb 2016 17:23:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Tue, 02 Feb 2016 17:23:19 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java new file mode 100644 index 0000000..c0c7797 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.util.List; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.util.InstantiationUtil; + + +public final class PojoComparator extends CompositeTypeComparator implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + // Reflection fields for the comp fields + private transient Field[] keyFields; + + private final TypeComparator[] comparators; + + private final int[] normalizedKeyLengths; + + private final int numLeadingNormalizableKeys; + + private final int normalizableKeyPrefixLen; + + private final boolean invertNormKey; + + private TypeSerializer serializer; + + private final Class type; + + @SuppressWarnings("unchecked") + public PojoComparator(Field[] keyFields, TypeComparator[] comparators, TypeSerializer serializer, Class type) { + this.keyFields = keyFields; + this.comparators = (TypeComparator[]) comparators; + + this.type = type; + this.serializer = serializer; + + // set up auxiliary fields for normalized key support + this.normalizedKeyLengths = new int[keyFields.length]; + int nKeys = 0; + int nKeyLen = 0; + boolean inverted = false; + + for (int i = 0; i < this.comparators.length; i++) { + TypeComparator k = this.comparators[i]; + if(k == null) { + throw new IllegalArgumentException("One of the passed comparators is null"); + } + if(keyFields[i] == null) { + throw new IllegalArgumentException("One of the passed reflection fields is null"); + } + + // as long as the leading keys support normalized keys, we can build up the composite key + if (k.supportsNormalizedKey()) { + if (i == 0) { + // the first comparator decides whether we need to invert the key direction + inverted = k.invertNormalizedKey(); + } + else if (k.invertNormalizedKey() != inverted) { + // if a successor does not agree on the invertion direction, it cannot be part of the normalized key + break; + } + + nKeys++; + final int len = k.getNormalizeKeyLen(); + if (len < 0) { + throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len); + } + this.normalizedKeyLengths[i] = len; + nKeyLen += this.normalizedKeyLengths[i]; + + if (nKeyLen < 0) { + // overflow, which means we are out of budget for normalized key space anyways + nKeyLen = Integer.MAX_VALUE; + break; + } + } else { + break; + } + } + this.numLeadingNormalizableKeys = nKeys; + this.normalizableKeyPrefixLen = nKeyLen; + this.invertNormKey = inverted; + } + + @SuppressWarnings("unchecked") + private PojoComparator(PojoComparator toClone) { + this.keyFields = toClone.keyFields; + this.comparators = new TypeComparator[toClone.comparators.length]; + + for (int i = 0; i < toClone.comparators.length; i++) { + this.comparators[i] = toClone.comparators[i].duplicate(); + } + + this.normalizedKeyLengths = toClone.normalizedKeyLengths; + this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys; + this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen; + this.invertNormKey = toClone.invertNormKey; + + this.type = toClone.type; + + try { + this.serializer = (TypeSerializer) InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(toClone.serializer), Thread.currentThread().getContextClassLoader()); + } catch (IOException e) { + throw new RuntimeException("Cannot copy serializer", e); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot copy serializer", e); + } + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + out.writeInt(keyFields.length); + for (Field field: keyFields) { + out.writeObject(field.getDeclaringClass()); + out.writeUTF(field.getName()); + } + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + int numKeyFields = in.readInt(); + keyFields = new Field[numKeyFields]; + for (int i = 0; i < numKeyFields; i++) { + Class clazz = (Class) in.readObject(); + String fieldName = in.readUTF(); + // try superclasses as well + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + keyFields[i] = field; + break; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + if (keyFields[i] == null ) { + throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." + + " (" + fieldName + ")"); + } + } + } + + public Field[] getKeyFields() { + return this.keyFields; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void getFlatComparator(List flatComparators) { + for(int i = 0; i < comparators.length; i++) { + if(comparators[i] instanceof CompositeTypeComparator) { + ((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators); + } else { + flatComparators.add(comparators[i]); + } + } + } + + /** + * This method is handling the IllegalAccess exceptions of Field.get() + */ + public final Object accessField(Field field, Object object) { + try { + object = field.get(object); + } catch (NullPointerException npex) { + throw new NullKeyFieldException("Unable to access field "+field+" on object "+object); + } catch (IllegalAccessException iaex) { + throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo." + + " fields: " + field + " obj: " + object); + } + return object; + } + + @Override + public int hash(T value) { + int i = 0; + int code = 0; + for (; i < this.keyFields.length; i++) { + code *= TupleComparatorBase.HASH_SALT[i & 0x1F]; + try { + code += this.comparators[i].hash(accessField(keyFields[i], value)); + }catch(NullPointerException npe) { + throw new RuntimeException("A NullPointerException occured while accessing a key field in a POJO. " + + "Most likely, the value grouped/joined on is null. Field name: "+keyFields[i].getName(), npe); + } + } + return code; + + } + + @Override + public void setReference(T toCompare) { + int i = 0; + for (; i < this.keyFields.length; i++) { + this.comparators[i].setReference(accessField(keyFields[i], toCompare)); + } + } + + @Override + public boolean equalToReference(T candidate) { + int i = 0; + for (; i < this.keyFields.length; i++) { + if (!this.comparators[i].equalToReference(accessField(keyFields[i], candidate))) { + return false; + } + } + return true; + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + PojoComparator other = (PojoComparator) referencedComparator; + + int i = 0; + try { + for (; i < this.keyFields.length; i++) { + int cmp = this.comparators[i].compareToReference(other.comparators[i]); + if (cmp != 0) { + return cmp; + } + } + return 0; + } + catch (NullPointerException npex) { + throw new NullKeyFieldException(this.keyFields[i].toString()); + } + } + + @Override + public int compare(T first, T second) { + int i = 0; + for (; i < keyFields.length; i++) { + int cmp = comparators[i].compare(accessField(keyFields[i], first), accessField(keyFields[i], second)); + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + T first = this.serializer.createInstance(); + T second = this.serializer.createInstance(); + + first = this.serializer.deserialize(first, firstSource); + second = this.serializer.deserialize(second, secondSource); + + return this.compare(first, second); + } + + @Override + public boolean supportsNormalizedKey() { + return this.numLeadingNormalizableKeys > 0; + } + + @Override + public int getNormalizeKeyLen() { + return this.normalizableKeyPrefixLen; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return this.numLeadingNormalizableKeys < this.keyFields.length || + this.normalizableKeyPrefixLen == Integer.MAX_VALUE || + this.normalizableKeyPrefixLen > keyBytes; + } + + @Override + public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) { + int i = 0; + for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++) + { + int len = this.normalizedKeyLengths[i]; + len = numBytes >= len ? len : numBytes; + this.comparators[i].putNormalizedKey(accessField(keyFields[i], value), target, offset, len); + numBytes -= len; + offset += len; + } + } + + @Override + public boolean invertNormalizedKey() { + return this.invertNormKey; + } + + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public PojoComparator duplicate() { + return new PojoComparator(this); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + int localIndex = index; + for (int i = 0; i < comparators.length; i++) { + localIndex += comparators[i].extractKeys(accessField(keyFields[i], record), target, localIndex); + } + return localIndex - index; + } + + // -------------------------------------------------------------------------------------------- +} + http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java new file mode 100644 index 0000000..de24956 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + + +public final class PojoSerializer extends TypeSerializer { + + // Flags for the header + private static byte IS_NULL = 1; + private static byte NO_SUBCLASS = 2; + private static byte IS_SUBCLASS = 4; + private static byte IS_TAGGED_SUBCLASS = 8; + + private static final long serialVersionUID = 1L; + + private final Class clazz; + + private final TypeSerializer[] fieldSerializers; + + private final int numFields; + + private final Map, Integer> registeredClasses; + + private final TypeSerializer[] registeredSerializers; + + private final ExecutionConfig executionConfig; + + private transient Map, TypeSerializer> subclassSerializerCache; + private transient ClassLoader cl; + // We need to handle these ourselves in writeObject()/readObject() + private transient Field[] fields; + + @SuppressWarnings("unchecked") + public PojoSerializer( + Class clazz, + TypeSerializer[] fieldSerializers, + Field[] fields, + ExecutionConfig executionConfig) { + + this.clazz = Preconditions.checkNotNull(clazz); + this.fieldSerializers = (TypeSerializer[]) Preconditions.checkNotNull(fieldSerializers); + this.fields = Preconditions.checkNotNull(fields); + this.numFields = fieldSerializers.length; + this.executionConfig = Preconditions.checkNotNull(executionConfig); + + LinkedHashSet> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + + for (int i = 0; i < numFields; i++) { + this.fields[i].setAccessible(true); + } + + cl = Thread.currentThread().getContextClassLoader(); + + subclassSerializerCache = new HashMap, TypeSerializer>(); + + // We only want those classes that are not our own class and are actually sub-classes. + List> cleanedTaggedClasses = new ArrayList>(registeredPojoTypes.size()); + for (Class registeredClass: registeredPojoTypes) { + if (registeredClass.equals(clazz)) { + continue; + } + if (!clazz.isAssignableFrom(registeredClass)) { + continue; + } + cleanedTaggedClasses.add(registeredClass); + + } + this.registeredClasses = new LinkedHashMap, Integer>(cleanedTaggedClasses.size()); + registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + + int id = 0; + for (Class registeredClass: cleanedTaggedClasses) { + this.registeredClasses.put(registeredClass, id); + TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass); + registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + + id++; + } + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + out.writeInt(fields.length); + for (Field field: fields) { + out.writeObject(field.getDeclaringClass()); + out.writeUTF(field.getName()); + } + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + int numFields = in.readInt(); + fields = new Field[numFields]; + for (int i = 0; i < numFields; i++) { + Class clazz = (Class)in.readObject(); + String fieldName = in.readUTF(); + fields[i] = null; + // try superclasses as well + while (clazz != null) { + try { + fields[i] = clazz.getDeclaredField(fieldName); + fields[i].setAccessible(true); + break; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + if (fields[i] == null) { + throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." + + " (" + fieldName + ")"); + } + } + + cl = Thread.currentThread().getContextClassLoader(); + subclassSerializerCache = new HashMap, TypeSerializer>(); + } + + private TypeSerializer getSubclassSerializer(Class subclass) { + TypeSerializer result = subclassSerializerCache.get(subclass); + if (result == null) { + + TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass); + result = typeInfo.createSerializer(executionConfig); + if (result instanceof PojoSerializer) { + PojoSerializer subclassSerializer = (PojoSerializer) result; + subclassSerializer.copyBaseFieldOrder(this); + } + subclassSerializerCache.put(subclass, result); + + } + return result; + } + + @SuppressWarnings("unused") + private boolean hasField(Field f) { + for (Field field: fields) { + if (f.equals(field)) { + return true; + } + } + return false; + } + + private void copyBaseFieldOrder(PojoSerializer baseSerializer) { + // do nothing for now, but in the future, adapt subclass serializer to have same + // ordering as base class serializer so that binary comparison on base class fields + // can work + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public PojoSerializer duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; + + for (int i = 0; i < fieldSerializers.length; i++) { + duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); + if (duplicateFieldSerializers[i] != fieldSerializers[i]) { + // at least one of them is stateful + stateful = true; + } + } + + if (stateful) { + return new PojoSerializer(clazz, duplicateFieldSerializers, fields, executionConfig); + } else { + return this; + } + } + + + @Override + public T createInstance() { + if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) { + return null; + } + try { + T t = clazz.newInstance(); + initializeFields(t); + return t; + } + catch (Exception e) { + throw new RuntimeException("Cannot instantiate class.", e); + } + } + + protected void initializeFields(T t) { + for (int i = 0; i < numFields; i++) { + try { + fields[i].set(t, fieldSerializers[i].createInstance()); + } catch (IllegalAccessException e) { + throw new RuntimeException("Cannot initialize fields.", e); + } + } + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public T copy(T from) { + if (from == null) { + return null; + } + + Class actualType = from.getClass(); + if (actualType == clazz) { + T target; + try { + target = (T) from.getClass().newInstance(); + } + catch (Throwable t) { + throw new RuntimeException("Cannot instantiate class.", t); + } + // no subclass + try { + for (int i = 0; i < numFields; i++) { + Object value = fields[i].get(from); + if (value != null) { + Object copy = fieldSerializers[i].copy(value); + fields[i].set(target, copy); + } + else { + fields[i].set(target, null); + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before."); + + } + return target; + } else { + // subclass + TypeSerializer subclassSerializer = getSubclassSerializer(actualType); + return (T) subclassSerializer.copy(from); + } + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public T copy(T from, T reuse) { + if (from == null) { + return null; + } + + Class actualType = from.getClass(); + if (reuse == null || actualType != reuse.getClass()) { + // cannot reuse, do a non-reuse copy + return copy(from); + } + + if (actualType == clazz) { + try { + for (int i = 0; i < numFields; i++) { + Object value = fields[i].get(from); + if (value != null) { + Object reuseValue = fields[i].get(reuse); + Object copy; + if(reuseValue != null) { + copy = fieldSerializers[i].copy(value, reuseValue); + } + else { + copy = fieldSerializers[i].copy(value); + } + fields[i].set(reuse, copy); + } + else { + fields[i].set(reuse, null); + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); + } + } else { + TypeSerializer subclassSerializer = getSubclassSerializer(actualType); + reuse = (T) subclassSerializer.copy(from, reuse); + } + + return reuse; + } + + @Override + public int getLength() { + return -1; + } + + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public void serialize(T value, DataOutputView target) throws IOException { + int flags = 0; + // handle null values + if (value == null) { + flags |= IS_NULL; + target.writeByte(flags); + return; + } + + Integer subclassTag = -1; + Class actualClass = value.getClass(); + TypeSerializer subclassSerializer = null; + if (clazz != actualClass) { + subclassTag = registeredClasses.get(actualClass); + if (subclassTag != null) { + flags |= IS_TAGGED_SUBCLASS; + subclassSerializer = registeredSerializers[subclassTag]; + } else { + flags |= IS_SUBCLASS; + subclassSerializer = getSubclassSerializer(actualClass); + } + } else { + flags |= NO_SUBCLASS; + } + + target.writeByte(flags); + + if ((flags & IS_SUBCLASS) != 0) { + target.writeUTF(actualClass.getName()); + } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + target.writeByte(subclassTag); + } + + + if ((flags & NO_SUBCLASS) != 0) { + try { + for (int i = 0; i < numFields; i++) { + Object o = fields[i].get(value); + if (o == null) { + target.writeBoolean(true); // null field handling + } else { + target.writeBoolean(false); + fieldSerializers[i].serialize(o, target); + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); + + } + } else { + // subclass + if (subclassSerializer != null) { + subclassSerializer.serialize(value, target); + } + } + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public T deserialize(DataInputView source) throws IOException { + int flags = source.readByte(); + if((flags & IS_NULL) != 0) { + return null; + } + + T target; + + Class actualSubclass = null; + TypeSerializer subclassSerializer = null; + + if ((flags & IS_SUBCLASS) != 0) { + String subclassName = source.readUTF(); + try { + actualSubclass = Class.forName(subclassName, true, cl); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot instantiate class.", e); + } + subclassSerializer = getSubclassSerializer(actualSubclass); + target = (T) subclassSerializer.createInstance(); + // also initialize fields for which the subclass serializer is not responsible + initializeFields(target); + } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + + int subclassTag = source.readByte(); + subclassSerializer = registeredSerializers[subclassTag]; + target = (T) subclassSerializer.createInstance(); + // also initialize fields for which the subclass serializer is not responsible + initializeFields(target); + } else { + target = createInstance(); + } + + if ((flags & NO_SUBCLASS) != 0) { + try { + for (int i = 0; i < numFields; i++) { + boolean isNull = source.readBoolean(); + if (isNull) { + fields[i].set(target, null); + } else { + Object field = fieldSerializers[i].deserialize(source); + fields[i].set(target, field); + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); + + } + } else { + if (subclassSerializer != null) { + target = (T) subclassSerializer.deserialize(target, source); + } + } + return target; + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public T deserialize(T reuse, DataInputView source) throws IOException { + + // handle null values + int flags = source.readByte(); + if((flags & IS_NULL) != 0) { + return null; + } + + Class subclass = null; + TypeSerializer subclassSerializer = null; + if ((flags & IS_SUBCLASS) != 0) { + String subclassName = source.readUTF(); + try { + subclass = Class.forName(subclassName, true, cl); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot instantiate class.", e); + } + subclassSerializer = getSubclassSerializer(subclass); + + if (reuse == null || subclass != reuse.getClass()) { + // cannot reuse + reuse = (T) subclassSerializer.createInstance(); + // also initialize fields for which the subclass serializer is not responsible + initializeFields(reuse); + } + } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + int subclassTag = source.readByte(); + subclassSerializer = registeredSerializers[subclassTag]; + + if (reuse == null || ((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) { + // cannot reuse + reuse = (T) subclassSerializer.createInstance(); + // also initialize fields for which the subclass serializer is not responsible + initializeFields(reuse); + } + } else { + if (reuse == null || clazz != reuse.getClass()) { + reuse = createInstance(); + } + } + + if ((flags & NO_SUBCLASS) != 0) { + try { + for (int i = 0; i < numFields; i++) { + boolean isNull = source.readBoolean(); + if (isNull) { + fields[i].set(reuse, null); + } else { + Object field; + + Object reuseField = fields[i].get(reuse); + if(reuseField != null) { + field = fieldSerializers[i].deserialize(reuseField, source); + } + else { + field = fieldSerializers[i].deserialize(source); + } + + fields[i].set(reuse, field); + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException( + "Error during POJO copy, this should not happen since we check the fields before."); + } + } else { + if (subclassSerializer != null) { + reuse = (T) subclassSerializer.deserialize(reuse, source); + } + } + + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + // copy the flags + int flags = source.readByte(); + target.writeByte(flags); + + if ((flags & IS_NULL) != 0) { + // is a null value, nothing further to copy + return; + } + + TypeSerializer subclassSerializer = null; + if ((flags & IS_SUBCLASS) != 0) { + String className = source.readUTF(); + target.writeUTF(className); + try { + Class subclass = Class.forName(className, true, Thread.currentThread() + .getContextClassLoader()); + subclassSerializer = getSubclassSerializer(subclass); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot instantiate class.", e); + } + } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + int subclassTag = source.readByte(); + target.writeByte(subclassTag); + subclassSerializer = registeredSerializers[subclassTag]; + } + + if ((flags & NO_SUBCLASS) != 0) { + for (int i = 0; i < numFields; i++) { + boolean isNull = source.readBoolean(); + target.writeBoolean(isNull); + if (!isNull) { + fieldSerializers[i].copy(source, target); + } + } + } else { + if (subclassSerializer != null) { + subclassSerializer.copy(source, target); + } + } + } + + @Override + public int hashCode() { + return 31 * (31 * Arrays.hashCode(fieldSerializers) + Arrays.hashCode(registeredSerializers)) + + Objects.hash(clazz, numFields, registeredClasses); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PojoSerializer) { + PojoSerializer other = (PojoSerializer) obj; + + return other.canEqual(this) && + clazz == other.clazz && + Arrays.equals(fieldSerializers, other.fieldSerializers) && + Arrays.equals(registeredSerializers, other.registeredSerializers) && + numFields == other.numFields && + registeredClasses.equals(other.registeredClasses); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof PojoSerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java new file mode 100644 index 0000000..4b734a7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparatorFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.InstantiationUtil; + +public final class RuntimeComparatorFactory implements TypeComparatorFactory, java.io.Serializable { + + private static final long serialVersionUID = 1L; + + + private static final String CONFIG_KEY = "SER_DATA"; + + private TypeComparator comparator; + + + public RuntimeComparatorFactory() {} + + public RuntimeComparatorFactory(TypeComparator comparator) { + this.comparator = comparator; + } + + @Override + public void writeParametersToConfig(Configuration config) { + try { + InstantiationUtil.writeObjectToConfig(comparator, config, CONFIG_KEY); + } + catch (Exception e) { + throw new RuntimeException("Could not serialize comparator into the configuration.", e); + } + } + + @SuppressWarnings("unchecked") + @Override + public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { + try { + comparator = (TypeComparator) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY, cl); + } + catch (ClassNotFoundException e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException("Could not serialize serializer into the configuration.", e); + } + } + + @Override + public TypeComparator createComparator() { + if (comparator != null) { + return comparator; + } else { + throw new RuntimeException("ComparatorFactory has not been initialized from configuration."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java new file mode 100644 index 0000000..31e28f7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; + +public final class RuntimePairComparatorFactory + implements TypePairComparatorFactory, java.io.Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public TypePairComparator createComparator12( + TypeComparator comparator1, + TypeComparator comparator2) { + return new GenericPairComparator(comparator1, comparator2); + } + + @Override + public TypePairComparator createComparator21( + TypeComparator comparator1, + TypeComparator comparator2) { + return new GenericPairComparator(comparator2, comparator1); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java new file mode 100644 index 0000000..96aff73 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.InstantiationUtil; + +public final class RuntimeSerializerFactory implements TypeSerializerFactory, java.io.Serializable { + + private static final long serialVersionUID = 1L; + + + private static final String CONFIG_KEY_SER = "SER_DATA"; + + private static final String CONFIG_KEY_CLASS = "CLASS_DATA"; + + + private TypeSerializer serializer; + + private boolean firstSerializer = true; + + private Class clazz; + + // Because we read the class from the TaskConfig and instantiate ourselves + public RuntimeSerializerFactory() {} + + public RuntimeSerializerFactory(TypeSerializer serializer, Class clazz) { + if (serializer == null || clazz == null) { + throw new NullPointerException(); + } + + this.clazz = clazz; + this.serializer = serializer; + } + + + @Override + public void writeParametersToConfig(Configuration config) { + try { + InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS); + InstantiationUtil.writeObjectToConfig(serializer, config, CONFIG_KEY_SER); + } + catch (Exception e) { + throw new RuntimeException("Could not serialize serializer into the configuration.", e); + } + } + + @SuppressWarnings("unchecked") + @Override + public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { + if (config == null || cl == null) { + throw new NullPointerException(); + } + + try { + this.clazz = (Class) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); + this.serializer = (TypeSerializer) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl); + firstSerializer = true; + } + catch (ClassNotFoundException e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException("Could not load deserializer from the configuration.", e); + } + } + + @Override + public TypeSerializer getSerializer() { + if (this.serializer != null) { + if (firstSerializer) { + firstSerializer = false; + return this.serializer; + } else { + return this.serializer.duplicate(); + } + } else { + throw new RuntimeException("SerializerFactory has not been initialized from configuration."); + } + } + + @Override + public Class getDataType() { + return clazz; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return clazz.hashCode() ^ serializer.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof RuntimeSerializerFactory) { + RuntimeSerializerFactory other = (RuntimeSerializerFactory) obj; + + return this.clazz == other.clazz && + this.serializer.equals(other.serializer); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java new file mode 100644 index 0000000..a06ff1a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class Tuple0Serializer extends TupleSerializer { + + private static final long serialVersionUID = 1278813169022975971L; + + public static final Tuple0Serializer INSTANCE = new Tuple0Serializer(); + + // ------------------------------------------------------------------------ + + private Tuple0Serializer() { + super(Tuple0.class, new TypeSerializer[0]); + } + + // ------------------------------------------------------------------------ + + @Override + public Tuple0Serializer duplicate() { + return this; + } + + @Override + public Tuple0 createInstance() { + return Tuple0.INSTANCE; + } + + @Override + public Tuple0 createInstance(Object[] fields) { + if (fields == null || fields.length == 0) { + return Tuple0.INSTANCE; + } + + throw new UnsupportedOperationException( + "Tuple0 cannot take any data, as it has zero fields."); + } + + @Override + public Tuple0 copy(Tuple0 from) { + return from; + } + + @Override + public Tuple0 copy(Tuple0 from, Tuple0 reuse) { + return reuse; + } + + @Override + public int getLength() { + return 1; + } + + @Override + public void serialize(Tuple0 record, DataOutputView target) throws IOException { + target.writeByte(42); + } + + @Override + public Tuple0 deserialize(DataInputView source) throws IOException { + source.readByte(); + return Tuple0.INSTANCE; + } + + @Override + public Tuple0 deserialize(Tuple0 reuse, DataInputView source) throws IOException { + source.readByte(); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.writeByte(source.readByte()); + } + + // ------------------------------------------------------------------------ + + + @Override + public int hashCode() { + return Tuple0Serializer.class.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Tuple0Serializer) { + Tuple0Serializer other = (Tuple0Serializer) obj; + + return other.canEqual(this); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof Tuple0Serializer; + } + + @Override + public String toString() { + return "Tuple0Serializer"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java new file mode 100644 index 0000000..875ecc2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.KeyFieldOutOfBoundsException; +import org.apache.flink.types.NullFieldException; +import org.apache.flink.types.NullKeyFieldException; + + +public final class TupleComparator extends TupleComparatorBase { + + private static final long serialVersionUID = 1L; + + public TupleComparator(int[] keyPositions, TypeComparator[] comparators, TypeSerializer[] serializers) { + super(keyPositions, comparators, serializers); + } + + private TupleComparator(TupleComparator toClone) { + super(toClone); + } + + // -------------------------------------------------------------------------------------------- + // Comparator Methods + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + @Override + public int hash(T value) { + int i = 0; + try { + int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0])); + for (i = 1; i < this.keyPositions.length; i++) { + code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component + code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i])); + } + return code; + } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } + catch (IndexOutOfBoundsException iobex) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + } + + @SuppressWarnings("unchecked") + @Override + public void setReference(T toCompare) { + int i = 0; + try { + for (; i < this.keyPositions.length; i++) { + this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i])); + } + } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } + catch (IndexOutOfBoundsException iobex) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + } + + @SuppressWarnings("unchecked") + @Override + public boolean equalToReference(T candidate) { + int i = 0; + try { + for (; i < this.keyPositions.length; i++) { + if (!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i]))) { + return false; + } + } + return true; + } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } + catch (IndexOutOfBoundsException iobex) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + } + + @SuppressWarnings("unchecked") + @Override + public int compare(T first, T second) { + int i = 0; + try { + for (; i < keyPositions.length; i++) { + int keyPos = keyPositions[i]; + int cmp = comparators[i].compare(first.getFieldNotNull(keyPos), second.getFieldNotNull(keyPos)); + + if (cmp != 0) { + return cmp; + } + } + return 0; + } + catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } + catch (IndexOutOfBoundsException iobex) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + } + + @SuppressWarnings("unchecked") + @Override + public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) { + int i = 0; + try { + for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++) { + int len = this.normalizedKeyLengths[i]; + len = numBytes >= len ? len : numBytes; + this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len); + numBytes -= len; + offset += len; + } + } catch (NullFieldException nfex) { + throw new NullKeyFieldException(nfex); + } catch (NullPointerException npex) { + throw new NullKeyFieldException(this.keyPositions[i]); + } + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + int localIndex = index; + for(int i = 0; i < comparators.length; i++) { + localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex); + } + return localIndex - index; + } + + public TypeComparator duplicate() { + return new TupleComparator(this); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java new file mode 100644 index 0000000..28169e5 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; +import java.util.List; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.KeyFieldOutOfBoundsException; +import org.apache.flink.types.NullKeyFieldException; + + +public abstract class TupleComparatorBase extends CompositeTypeComparator implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + /** key positions describe which fields are keys in what order */ + protected int[] keyPositions; + + /** comparators for the key fields, in the same order as the key fields */ + @SuppressWarnings("rawtypes") + protected TypeComparator[] comparators; + + protected int[] normalizedKeyLengths; + + protected int numLeadingNormalizableKeys; + + protected int normalizableKeyPrefixLen; + + protected boolean invertNormKey; + + + /** serializers to deserialize the first n fields for comparison */ + @SuppressWarnings("rawtypes") + protected TypeSerializer[] serializers; + + // cache for the deserialized field objects + protected transient Object[] deserializedFields1; + protected transient Object[] deserializedFields2; + + + @SuppressWarnings("unchecked") + public TupleComparatorBase(int[] keyPositions, TypeComparator[] comparators, TypeSerializer[] serializers) { + // set the default utils + this.keyPositions = keyPositions; + this.comparators = (TypeComparator[]) comparators; + this.serializers = (TypeSerializer[]) serializers; + + // set up auxiliary fields for normalized key support + this.normalizedKeyLengths = new int[keyPositions.length]; + int nKeys = 0; + int nKeyLen = 0; + boolean inverted = false; + + for (int i = 0; i < this.keyPositions.length; i++) { + TypeComparator k = this.comparators[i]; + + // as long as the leading keys support normalized keys, we can build up the composite key + if (k.supportsNormalizedKey()) { + if (i == 0) { + // the first comparator decides whether we need to invert the key direction + inverted = k.invertNormalizedKey(); + } + else if (k.invertNormalizedKey() != inverted) { + // if a successor does not agree on the inversion direction, it cannot be part of the normalized key + break; + } + + nKeys++; + final int len = k.getNormalizeKeyLen(); + if (len < 0) { + throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len); + } + this.normalizedKeyLengths[i] = len; + nKeyLen += len; + + if (nKeyLen < 0) { + // overflow, which means we are out of budget for normalized key space anyways + nKeyLen = Integer.MAX_VALUE; + break; + } + } else { + break; + } + } + this.numLeadingNormalizableKeys = nKeys; + this.normalizableKeyPrefixLen = nKeyLen; + this.invertNormKey = inverted; + } + + protected TupleComparatorBase(TupleComparatorBase toClone) { + privateDuplicate(toClone); + } + + // We need this because we cannot call the cloning constructor from the + // ScalaTupleComparator + protected void privateDuplicate(TupleComparatorBase toClone) { + // copy fields and serializer factories + this.keyPositions = toClone.keyPositions; + + this.serializers = new TypeSerializer[toClone.serializers.length]; + for (int i = 0; i < toClone.serializers.length; i++) { + this.serializers[i] = toClone.serializers[i].duplicate(); + } + + this.comparators = new TypeComparator[toClone.comparators.length]; + for (int i = 0; i < toClone.comparators.length; i++) { + this.comparators[i] = toClone.comparators[i].duplicate(); + } + + this.normalizedKeyLengths = toClone.normalizedKeyLengths; + this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys; + this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen; + this.invertNormKey = toClone.invertNormKey; + } + + // -------------------------------------------------------------------------------------------- + // Comparator Methods + // -------------------------------------------------------------------------------------------- + + protected int[] getKeyPositions() { + return this.keyPositions; + } + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void getFlatComparator(List flatComparators) { + for(int i = 0; i < comparators.length; i++) { + if(comparators[i] instanceof CompositeTypeComparator) { + ((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators); + } else { + flatComparators.add(comparators[i]); + } + } + } + // -------------------------------------------------------------------------------------------- + // Comparator Methods + // -------------------------------------------------------------------------------------------- + + + @Override + public int compareToReference(TypeComparator referencedComparator) { + TupleComparatorBase other = (TupleComparatorBase) referencedComparator; + + int i = 0; + try { + for (; i < this.keyPositions.length; i++) { + @SuppressWarnings("unchecked") + int cmp = this.comparators[i].compareToReference(other.comparators[i]); + if (cmp != 0) { + return cmp; + } + } + return 0; + } + catch (NullPointerException npex) { + throw new NullKeyFieldException(keyPositions[i]); + } + catch (IndexOutOfBoundsException iobex) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + } + + @SuppressWarnings("unchecked") + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + if (deserializedFields1 == null) { + instantiateDeserializationUtils(); + } + + int i = 0; + try { + for (; i < serializers.length; i++) { + deserializedFields1[i] = serializers[i].deserialize(deserializedFields1[i], firstSource); + deserializedFields2[i] = serializers[i].deserialize(deserializedFields2[i], secondSource); + } + + for (i = 0; i < keyPositions.length; i++) { + int keyPos = keyPositions[i]; + int cmp = comparators[i].compare(deserializedFields1[keyPos], deserializedFields2[keyPos]); + if (cmp != 0) { + return cmp; + } + } + + return 0; + } catch (NullPointerException npex) { + throw new NullKeyFieldException(keyPositions[i]); + } catch (IndexOutOfBoundsException iobex) { + throw new KeyFieldOutOfBoundsException(keyPositions[i], iobex); + } + } + + @Override + public boolean supportsNormalizedKey() { + return this.numLeadingNormalizableKeys > 0; + } + + @Override + public int getNormalizeKeyLen() { + return this.normalizableKeyPrefixLen; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return this.numLeadingNormalizableKeys < this.keyPositions.length || + this.normalizableKeyPrefixLen == Integer.MAX_VALUE || + this.normalizableKeyPrefixLen > keyBytes; + } + + @Override + public boolean invertNormalizedKey() { + return this.invertNormKey; + } + + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + // -------------------------------------------------------------------------------------------- + + protected final void instantiateDeserializationUtils() { + this.deserializedFields1 = new Object[this.serializers.length]; + this.deserializedFields2 = new Object[this.serializers.length]; + + for (int i = 0; i < this.serializers.length; i++) { + this.deserializedFields1[i] = this.serializers[i].createInstance(); + this.deserializedFields2[i] = this.serializers[i].createInstance(); + } + } + + // -------------------------------------------------------------------------------------------- + + /** + * A sequence of prime numbers to be used for salting the computed hash values. + * Based on some empirical evidence, we are using a 32-element subsequence of the + * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime). + * + * @see http://en.wikipedia.org/wiki/List_of_prime_numbers + * @see http://oeis.org/A068652 + */ + public static final int[] HASH_SALT = new int[] { + 73 , 79 , 97 , 113 , 131 , 197 , 199 , 311 , + 337 , 373 , 719 , 733 , 919 , 971 , 991 , 1193 , + 1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , + 19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 }; +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java new file mode 100644 index 0000000..0897063 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.NullFieldException; + + +public class TupleSerializer extends TupleSerializerBase { + + private static final long serialVersionUID = 1L; + + public TupleSerializer(Class tupleClass, TypeSerializer[] fieldSerializers) { + super(tupleClass, fieldSerializers); + } + + @Override + public TupleSerializer duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; + + for (int i = 0; i < fieldSerializers.length; i++) { + duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); + if (duplicateFieldSerializers[i] != fieldSerializers[i]) { + // at least one of them is stateful + stateful = true; + } + } + + if (stateful) { + return new TupleSerializer(tupleClass, duplicateFieldSerializers); + } else { + return this; + } + } + + @Override + public T createInstance() { + try { + T t = tupleClass.newInstance(); + + for (int i = 0; i < arity; i++) { + t.setField(fieldSerializers[i].createInstance(), i); + } + + return t; + } + catch (Exception e) { + throw new RuntimeException("Cannot instantiate tuple.", e); + } + } + + @Override + public T createInstance(Object[] fields) { + + try { + T t = tupleClass.newInstance(); + + for (int i = 0; i < arity; i++) { + t.setField(fields[i], i); + } + + return t; + } + catch (Exception e) { + throw new RuntimeException("Cannot instantiate tuple.", e); + } + } + + @Override + public T createOrReuseInstance(Object[] fields, T reuse) { + for (int i = 0; i < arity; i++) { + reuse.setField(fields[i], i); + } + return reuse; + } + + @Override + public T copy(T from) { + T target = instantiateRaw(); + for (int i = 0; i < arity; i++) { + Object copy = fieldSerializers[i].copy(from.getField(i)); + target.setField(copy, i); + } + return target; + } + + @Override + public T copy(T from, T reuse) { + for (int i = 0; i < arity; i++) { + Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i)); + reuse.setField(copy, i); + } + + return reuse; + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + for (int i = 0; i < arity; i++) { + Object o = value.getField(i); + try { + fieldSerializers[i].serialize(o, target); + } catch (NullPointerException npex) { + throw new NullFieldException(i, npex); + } + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + T tuple = instantiateRaw(); + for (int i = 0; i < arity; i++) { + Object field = fieldSerializers[i].deserialize(source); + tuple.setField(field, i); + } + return tuple; + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + for (int i = 0; i < arity; i++) { + Object field = fieldSerializers[i].deserialize(reuse.getField(i), source); + reuse.setField(field, i); + } + return reuse; + } + + private T instantiateRaw() { + try { + return tupleClass.newInstance(); + } + catch (Exception e) { + throw new RuntimeException("Cannot instantiate tuple.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java new file mode 100644 index 0000000..fc657a1 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + + +public abstract class TupleSerializerBase extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + protected final Class tupleClass; + + protected final TypeSerializer[] fieldSerializers; + + protected final int arity; + + @SuppressWarnings("unchecked") + public TupleSerializerBase(Class tupleClass, TypeSerializer[] fieldSerializers) { + this.tupleClass = Preconditions.checkNotNull(tupleClass); + this.fieldSerializers = (TypeSerializer[]) Preconditions.checkNotNull(fieldSerializers); + this.arity = fieldSerializers.length; + } + + public Class getTupleClass() { + return this.tupleClass; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public int getLength() { + return -1; + } + + public int getArity() { + return arity; + } + + // We use this in the Aggregate and Distinct Operators to create instances + // of immutable Typles (i.e. Scala Tuples) + public abstract T createInstance(Object[] fields); + + public abstract T createOrReuseInstance(Object[] fields, T reuse); + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + for (int i = 0; i < arity; i++) { + fieldSerializers[i].copy(source, target); + } + } + + @Override + public int hashCode() { + return 31 * Arrays.hashCode(fieldSerializers) + Objects.hash(tupleClass, arity); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TupleSerializerBase) { + TupleSerializerBase other = (TupleSerializerBase) obj; + + return other.canEqual(this) && + tupleClass == other.tupleClass && + Arrays.equals(fieldSerializers, other.fieldSerializers) && + arity == other.arity; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TupleSerializerBase; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java new file mode 100644 index 0000000..4b9629a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; +import org.apache.flink.types.Value; +import org.apache.flink.util.InstantiationUtil; + +import com.esotericsoftware.kryo.Kryo; +import org.objenesis.strategy.StdInstantiatorStrategy; + +/** + * Comparator for all Value types that extend Key + */ +public class ValueComparator> extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final Class type; + + private final boolean ascendingComparison; + + private transient T reference; + + private transient T tempReference; + + private transient Kryo kryo; + + @SuppressWarnings("rawtypes") + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public ValueComparator(boolean ascending, Class type) { + this.type = type; + this.ascendingComparison = ascending; + } + + @Override + public int hash(T record) { + return record.hashCode(); + } + + @Override + public void setReference(T toCompare) { + checkKryoInitialized(); + + reference = KryoUtils.copy(toCompare, kryo, new ValueSerializer(type)); + } + + @Override + public boolean equalToReference(T candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + T otherRef = ((ValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(T first, T second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + if (reference == null) { + reference = InstantiationUtil.instantiate(type, Value.class); + } + if (tempReference == null) { + tempReference = InstantiationUtil.instantiate(type, Value.class); + } + + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(type); + } + + @Override + public int getNormalizeKeyLen() { + if (reference == null) { + reference = InstantiationUtil.instantiate(type, Value.class); + } + + NormalizableKey key = (NormalizableKey) reference; + return key.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { + NormalizableKey key = (NormalizableKey) record; + key.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new ValueComparator(ascendingComparison, type); + } + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + this.kryo.setAsmEnabled(true); + this.kryo.register(type); + } + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @SuppressWarnings("rawtypes") + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java new file mode 100644 index 0000000..9329866 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.Value; +import org.apache.flink.util.InstantiationUtil; + +import com.esotericsoftware.kryo.Kryo; +import org.objenesis.strategy.StdInstantiatorStrategy; + +/** + * Serializer for {@link Value} types. Uses the value's serialization methods, and uses + * Kryo for deep object copies. + * + * @param The type serialized. + */ +public class ValueSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + private final Class type; + + private transient Kryo kryo; + + private transient T copyInstance; + + // -------------------------------------------------------------------------------------------- + + public ValueSerializer(Class type) { + this.type = Preconditions.checkNotNull(type); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public ValueSerializer duplicate() { + return new ValueSerializer(type); + } + + @Override + public T createInstance() { + return InstantiationUtil.instantiate(this.type); + } + + @Override + public T copy(T from) { + checkKryoInitialized(); + + return KryoUtils.copy(from, kryo, this); + } + + @Override + public T copy(T from, T reuse) { + checkKryoInitialized(); + + return KryoUtils.copy(from, reuse, kryo, this); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + value.write(target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return deserialize(createInstance(), source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + reuse.read(source); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + if (this.copyInstance == null) { + this.copyInstance = InstantiationUtil.instantiate(type); + } + + this.copyInstance.read(source); + this.copyInstance.write(target); + } + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + this.kryo.setAsmEnabled(true); + this.kryo.register(type); + } + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return this.type.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ValueSerializer) { + ValueSerializer other = (ValueSerializer) obj; + + return other.canEqual(this) && type == other.type; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof ValueSerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java new file mode 100644 index 0000000..a03369a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.io.Writable; + +import com.esotericsoftware.kryo.Kryo; +import org.objenesis.strategy.StdInstantiatorStrategy; + +public class WritableComparator> extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private Class type; + + private final boolean ascendingComparison; + + private transient T reference; + + private transient T tempReference; + + private transient Kryo kryo; + + @SuppressWarnings("rawtypes") + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public WritableComparator(boolean ascending, Class type) { + this.type = type; + this.ascendingComparison = ascending; + } + + @Override + public int hash(T record) { + return record.hashCode(); + } + + @Override + public void setReference(T toCompare) { + checkKryoInitialized(); + + reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer(type)); + } + + @Override + public boolean equalToReference(T candidate) { + return candidate.equals(reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + T otherRef = ((WritableComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(T first, T second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + ensureReferenceInstantiated(); + ensureTempReferenceInstantiated(); + + reference.readFields(firstSource); + tempReference.readFields(secondSource); + + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(type); + } + + @Override + public int getNormalizeKeyLen() { + ensureReferenceInstantiated(); + + NormalizableKey key = (NormalizableKey) reference; + return key.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { + NormalizableKey key = (NormalizableKey) record; + key.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new WritableComparator(ascendingComparison, type); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @SuppressWarnings("rawtypes") + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + // -------------------------------------------------------------------------------------------- + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + this.kryo.setAsmEnabled(true); + this.kryo.register(type); + } + } + + private void ensureReferenceInstantiated() { + if (reference == null) { + reference = InstantiationUtil.instantiate(type, Writable.class); + } + } + + private void ensureTempReferenceInstantiated() { + if (tempReference == null) { + tempReference = InstantiationUtil.instantiate(type, Writable.class); + } + } +}