Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A9118184BD for ; Tue, 9 Jun 2015 16:17:31 +0000 (UTC) Received: (qmail 70961 invoked by uid 500); 9 Jun 2015 16:17:31 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 70924 invoked by uid 500); 9 Jun 2015 16:17:31 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 70912 invoked by uid 99); 9 Jun 2015 16:17:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 16:17:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 624DCDFF87; Tue, 9 Jun 2015 16:17:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ikabiljo@apache.org To: commits@giraph.apache.org Date: Tue, 09 Jun 2015 16:17:32 -0000 Message-Id: <88d1921df1fd4cc59a5a37b1bef33479@git.apache.org> In-Reply-To: <3a9fb0470747415eb91f5486a90b1899@git.apache.org> References: <3a9fb0470747415eb91f5486a90b1899@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: updated refs/heads/trunk to 06a1084 [GIRAPH-1010] Add utilities to allow Kryo to serialize objects Test Plan: mvn clean install Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D39513 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/06a1084a Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/06a1084a Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/06a1084a Branch: refs/heads/trunk Commit: 06a1084af99ee46a03ebbbee53e3224ca8862132 Parents: 27661ba Author: Igor Kabiljo Authored: Wed Jun 3 13:35:20 2015 -0700 Committer: Igor Kabiljo Committed: Tue Jun 9 09:17:06 2015 -0700 ---------------------------------------------------------------------- giraph-core/pom.xml | 12 + .../org/apache/giraph/utils/WritableUtils.java | 126 +++-- .../writable/kryo/DataInputWrapperStream.java | 59 +++ .../writable/kryo/DataOutputWrapperStream.java | 47 ++ .../apache/giraph/writable/kryo/HadoopKryo.java | 435 +++++++++++++++++ .../giraph/writable/kryo/KryoWritable.java | 40 ++ .../writable/kryo/KryoWritableWrapper.java | 110 +++++ .../giraph/writable/kryo/TransientRandom.java | 69 +++ .../kryo/markers/KryoIgnoreWritable.java | 32 ++ .../writable/kryo/markers/NonKryoWritable.java | 26 + .../writable/kryo/markers/package-info.java | 21 + .../giraph/writable/kryo/package-info.java | 23 + .../serializers/ArraysAsListSerializer.java | 44 ++ .../CollectionsNCopiesSerializer.java | 52 ++ .../serializers/DirectWritableSerializer.java | 83 ++++ .../kryo/serializers/FastUtilSerializer.java | 476 +++++++++++++++++++ .../serializers/ReusableFieldSerializer.java | 57 +++ .../writable/kryo/serializers/package-info.java | 21 + .../giraph/writable/kryo/KryoWritableTest.java | 195 ++++++++ .../writable/kryo/KryoWritableWrapperTest.java | 292 ++++++++++++ pom.xml | 33 ++ 21 files changed, 2217 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml index 6fea1a3..4719a5d 100644 --- a/giraph-core/pom.xml +++ b/giraph-core/pom.xml @@ -542,6 +542,18 @@ under the License. org.python jython + + com.esotericsoftware + kryo + + + de.javakaffee + kryo-serializers + + + org.objenesis + objenesis + http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index be2ef9d..68ed89a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -748,42 +748,6 @@ public class WritableUtils { } /** - * Create a copy of Writable object, by serializing and deserializing it. - * - * @param reusableOut Reusable output stream to serialize into - * @param reusableIn Reusable input stream to deserialize out of - * @param original Original value of which to make a copy - * @param conf Configuration - * @param Type of the object - * @return Copy of the original value - */ - public static T createCopy( - UnsafeByteArrayOutputStream reusableOut, - UnsafeReusableByteArrayInput reusableIn, T original, - ImmutableClassesGiraphConfiguration conf) { - T copy = (T) createWritable(original.getClass(), conf); - - try { - reusableOut.reset(); - original.write(reusableOut); - reusableIn.initialize( - reusableOut.getByteArray(), 0, reusableOut.getPos()); - copy.readFields(reusableIn); - - if (reusableIn.available() != 0) { - throw new RuntimeException("Serialization of " + - original.getClass() + " encountered issues, " + - reusableIn.available() + " bytes left to be read"); - } - } catch (IOException e) { - throw new IllegalStateException( - "IOException occurred while trying to create a copy " + - original.getClass(), e); - } - return copy; - } - - /** * Writes primitive int array of ints into output stream. * Array can be null or empty. * @param array array to be written @@ -896,4 +860,94 @@ public class WritableUtils { return null; } } + + + /** + * Copy {@code from} into {@code to}, by serializing and deserializing it. + * Since it is creating streams inside, it's mostly useful for + * tests/non-performant code. + * + * @param from Object to copy from + * @param to Object to copy into + * @param Type of the object + */ + public static void copyInto(T from, T to) { + copyInto(from, to, false); + } + + /** + * Copy {@code from} into {@code to}, by serializing and deserializing it. + * Since it is creating streams inside, it's mostly useful for + * tests/non-performant code. + * + * @param from Object to copy from + * @param to Object to copy into + * @param checkOverRead if true, will add one more byte at the end of writing, + * to make sure read is not touching it. Useful for tests + * @param Type of the object + */ + public static void copyInto( + T from, T to, boolean checkOverRead) { + try { + if (from.getClass() != to.getClass()) { + throw new RuntimeException( + "Trying to copy from " + from.getClass() + + " into " + to.getClass()); + } + + UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream(); + from.write(out); + if (checkOverRead) { + out.writeByte(0); + } + + UnsafeByteArrayInputStream in = + new UnsafeByteArrayInputStream(out.getByteArray(), 0, out.getPos()); + to.readFields(in); + + if (in.available() != (checkOverRead ? 1 : 0)) { + throw new RuntimeException( + "Serialization encountered issues with " + from.getClass() + ", " + + (in.available() - (checkOverRead ? 1 : 0)) + " fewer bytes read"); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Create a copy of Writable object, by serializing and deserializing it. + * + * @param reusableOut Reusable output stream to serialize into + * @param reusableIn Reusable input stream to deserialize out of + * @param original Original value of which to make a copy + * @param conf Configuration + * @param Type of the object + * @return Copy of the original value + */ + public static T createCopy( + UnsafeByteArrayOutputStream reusableOut, + UnsafeReusableByteArrayInput reusableIn, T original, + ImmutableClassesGiraphConfiguration conf) { + T copy = (T) createWritable(original.getClass(), conf); + + try { + reusableOut.reset(); + original.write(reusableOut); + reusableIn.initialize( + reusableOut.getByteArray(), 0, reusableOut.getPos()); + copy.readFields(reusableIn); + + if (reusableIn.available() != 0) { + throw new RuntimeException("Serialization of " + + original.getClass() + " encountered issues, " + + reusableIn.available() + " bytes left to be read"); + } + } catch (IOException e) { + throw new IllegalStateException( + "IOException occurred while trying to create a copy " + + original.getClass(), e); + } + return copy; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java new file mode 100644 index 0000000..e0e23b0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +import java.io.DataInput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +/** + * Thin wrapper around DataInput so it can be used as an + * {@link java.io.InputStream} + * + * For use with {@link com.esotericsoftware.kryo.io.Input} + */ +public class DataInputWrapperStream extends InputStream { + /** Wrapped DataInput object */ + private DataInput in; + + public void setDataInput(DataInput in) { + this.in = in; + } + + @Override + public int read() throws IOException { + try { + return in.readByte() & 0xFF; + } catch (EOFException e) { + throw new RuntimeException( + "Chunked input should never read more than chunked output wrote", e); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + in.readFully(b, off, len); + return len; + } catch (EOFException e) { + throw new RuntimeException( + "Chunked input should never read more than chunked output wrote", e); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java new file mode 100644 index 0000000..3518f67 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; + +/** + * Thin wrapper around a DataOutput so it can be used as an + * {@link java.io.OutputStream} + * + * For use with {@link com.esotericsoftware.kryo.io.Output}) + */ +public class DataOutputWrapperStream extends OutputStream { + /** Wrapped DataOutput object */ + private DataOutput out; + + public void setDataOutput(DataOutput out) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java new file mode 100644 index 0000000..b4f2bfa --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +import java.io.DataInput; +import java.io.DataOutput; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.giraph.conf.GiraphConfigurationSettable; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.types.ops.collections.BasicSet; +import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable; +import org.apache.giraph.writable.kryo.markers.NonKryoWritable; +import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer; +import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer; +import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer; +import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer; +import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.factories.SerializerFactory; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.InputChunked; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.io.OutputChunked; +import com.esotericsoftware.kryo.pool.KryoCallback; +import com.esotericsoftware.kryo.pool.KryoFactory; +import com.esotericsoftware.kryo.pool.KryoPool; +import com.esotericsoftware.kryo.serializers.ClosureSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.util.ObjectMap; +import com.google.common.base.Preconditions; + +import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; + +/** + * Kryo instance that provides serialization through DataInput/DataOutput + * that org.apache.hadoop.io.Writable uses. + * + * All public APIs are static. + * + * It extends Kryo to reuse KryoPool functionality, but have additional needed + * objects cached as well. If we move to ThreadLocal or other caching + * technique, we can use composition, instead of inheritance here. + */ +public class HadoopKryo extends Kryo { + /** Pool of reusable Kryo objects, since they are expensive to create */ + private static final KryoPool KRYO_POOL = new KryoPool.Builder( + new KryoFactory() { + @Override + public Kryo create() { + return createKryo(); + } + }).build(); + + /** + * List of interfaces/parent classes that will not be allowed to be + * serialized, together with explanation of why, that will be shown + * when throwing such exception + */ + private static final Map, String> NON_SERIALIZABLE; + + static { + NON_SERIALIZABLE = new LinkedHashMap<>(); + NON_SERIALIZABLE.put( + NonKryoWritable.class, + "it is marked to not allow serialization, " + + "look at the class for more details"); + NON_SERIALIZABLE.put( + KryoWritableWrapper.class, "recursion is dissallowed"); + NON_SERIALIZABLE.put( + Configuration.class, + "it cannot be supported since it contains ClassLoader"); + NON_SERIALIZABLE.put( + GiraphConfigurationSettable.class, "configuration cannot be set"); + NON_SERIALIZABLE.put( + Configurable.class, "configuration cannot be set"); + NON_SERIALIZABLE.put( + Random.class, + "it should be rarely serialized, since it would create same stream " + + "of numbers everywhere, use TransientRandom instead"); + } + + // Use chunked streams, so within same stream we can use both kryo and + // non-kryo serialization. + /** Reusable Input object */ + private final InputChunked input = new InputChunked(4096); + /** Reusable Output object */ + private final OutputChunked output = new OutputChunked(4096); + + /** Reusable DataInput wrapper stream */ + private final DataInputWrapperStream dataInputWrapperStream = + new DataInputWrapperStream(); + /** Reusable DataOutput wrapper stream */ + private final DataOutputWrapperStream dataOutputWrapperStream = + new DataOutputWrapperStream(); + + /** + * Map of already initialized serializers used + * for readIntoObject/writeOutOfObject pair of methods + */ + private final ObjectMap, ReusableFieldSerializer> + classToIntoSerializer = new ObjectMap<>(); + + /** Hide constructor, so all access go through pool of cached objects */ + private HadoopKryo() { + } + + // Public API: + + /** + * Write type of given object and the object itself to the output stream. + * Inverse of readClassAndObject. + * + * @param out Output stream + * @param object Object to write + */ + public static void writeClassAndObject( + final DataOutput out, final Object object) { + writeInternal(out, object, false); + } + + /** + * Read object from the input stream, by reading first type of the object, + * and then all of it's fields. + * Inverse of writeClassAndObject. + * + * @param in Input stream + * @return Deserialized object + * @param Type of the object being read + */ + public static T readClassAndObject(DataInput in) { + return readInternal(in, null, false); + } + + /** + * Write an object to output, in a way that can be read by readIntoObject. + * + * @param out Output stream + * @param object Object to be written + */ + public static void writeOutOfObject( + final DataOutput out, final Object object) { + writeInternal(out, object, true); + } + + /** + * Reads an object, from input, into a given object, + * allowing object reuse. + * Inverse of writeOutOfObject. + * + * @param in Input stream + * @param object Object to fill from input + */ + public static void readIntoObject(DataInput in, Object object) { + readInternal(in, object, true); + } + + /** + * Create copy of the object, by magically recursively copying + * all of it's fields, keeping reference structures (like cycles) + * + * @param object Object to be copied + * @return Copy of the object. + * @param Type of the object + */ + public static T createCopy(final T object) { + return KRYO_POOL.run(new KryoCallback() { + @Override + public T execute(Kryo kryo) { + return kryo.copy(object); + } + }); + } + + // Private implementation: + + /** + * Create new instance of HadoopKryo, properly initialized. + * + * @return New HadoopKryo instnace + */ + private static HadoopKryo createKryo() { + HadoopKryo kryo = new HadoopKryo(); + + String version = System.getProperty("java.version"); + char minor = version.charAt(2); + if (minor >= '8') { + try { + kryo.register(Class.forName("java.lang.invoke.SerializedLambda")); + kryo.register(Class.forName("com.esotericsoftware.kryo.Kryo$Closure"), + new ClosureSerializer()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException( + "Trying to use Kryo on >= Java 8 (" + version + + "), but unable to find needed classes", e); + } + } + + kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer()); + kryo.register(Collections.nCopies(1, new Object()).getClass(), + new CollectionsNCopiesSerializer()); + + ImmutableListSerializer.registerSerializers(kryo); + + // TODO move guava version to 18.0, and remove this fix: + try { + kryo.register( + Class.forName("com.google.common.collect.RegularImmutableList"), + new ImmutableListSerializer()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException( + "Guava has RegularImmutableList missing", e); + } + + // There are many fastutil classes, register them at the end, + // so they don't use up small registration numbers + FastUtilSerializer.registerAll(kryo); + + kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy( + new StdInstantiatorStrategy())); + + kryo.setDefaultSerializer(new SerializerFactory() { + @SuppressWarnings("rawtypes") + @Override + public Serializer makeSerializer(Kryo kryo, final Class type) { + for (final Entry, String> entry : + NON_SERIALIZABLE.entrySet()) { + if (entry.getKey().isAssignableFrom(type)) { + // Allow Class object to be serialized, but not a live instance. + return new Serializer() { + @Override + public Object read(Kryo kryo, Input input, Class type) { + throw new RuntimeException("Cannot serialize " + type + + ". Objects being serialized cannot capture " + + entry.getKey() + " because " + entry.getValue() + + ". Either remove field in question" + + ", or make it transient (so that it isn't serialized)"); + } + + @Override + public void write(Kryo kryo, Output output, Object object) { + throw new RuntimeException("Cannot serialize " + type + + ". Objects being serialized cannot capture " + + entry.getKey() + " because " + entry.getValue() + + ". Either remove field in question" + + ", or make it transient (so that it isn't serialized)"); + } + }; + } + } + + if (Writable.class.isAssignableFrom(type) && + !KryoIgnoreWritable.class.isAssignableFrom(type) && + // remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily, + // for lack of constructors + !BasicSet.class.isAssignableFrom(type) && + !BasicArrayList.class.isAssignableFrom(type) && + !Basic2ObjectMap.class.isAssignableFrom(type)) { + // use the Writable method defined by the type + DirectWritableSerializer serializer = new DirectWritableSerializer(); + return serializer; + } else { + FieldSerializer serializer = new FieldSerializer<>(kryo, type); + serializer.setIgnoreSyntheticFields(false); + return serializer; + } + } + }); + + return kryo; + } + + /** + * Initialize reusable objects for reading from given DataInput. + * + * @param in Input stream + */ + private void setDataInput(DataInput in) { + dataInputWrapperStream.setDataInput(in); + input.setInputStream(dataInputWrapperStream); + } + + /** + * Initialize reusable objects for writing into given DataOutput. + * + * @param out Output stream + */ + private void setDataOutput(DataOutput out) { + dataOutputWrapperStream.setDataOutput(out); + output.setOutputStream(dataOutputWrapperStream); + } + + /** + * Get or create reusable serializer for given class. + * + * @param type Type of the object + * @return Serializer + */ + private ReusableFieldSerializer getOrCreateReusableSerializer( + Class type) { + ReusableFieldSerializer serializer = + classToIntoSerializer.get(type); + if (serializer == null) { + serializer = new ReusableFieldSerializer<>(this, type); + classToIntoSerializer.put(type, serializer); + } + return serializer; + } + + /** + * Internal write implementation, that reuses HadoopKryo objects + * from the pool. + * + * @param out Output stream + * @param object Object to be written + * @param outOf whether we are writing reusable objects, + * or full objects with class name + */ + private static void writeInternal( + final DataOutput out, final Object object, final boolean outOf) { + KRYO_POOL.run(new KryoCallback() { + @Override + public Void execute(Kryo kryo) { + HadoopKryo hkryo = (HadoopKryo) kryo; + hkryo.setDataOutput(out); + + if (outOf) { + hkryo.writeOutOfObject(hkryo.output, object); + } else { + hkryo.writeClassAndObject(hkryo.output, object); + } + + hkryo.output.endChunks(); + hkryo.output.close(); + + return null; + } + }); + } + + /** + * Internal read implementation, that reuses HadoopKryo objects + * from the pool. + * + * @param in Input stream + * @param outObject Object to fill from input (if not null) + * @param into whether we are reading reusable objects, + * or full objects with class name + * @return Read object (new one, or same passed in if we use reusable) + * @param Type of the object to read + */ + @SuppressWarnings("unchecked") + private static T readInternal( + final DataInput in, final T outObject, final boolean into) { + return KRYO_POOL.run(new KryoCallback() { + @Override + public T execute(Kryo kryo) { + HadoopKryo hkryo = (HadoopKryo) kryo; + hkryo.setDataInput(in); + + T object; + if (into) { + hkryo.readIntoObject(hkryo.input, outObject); + object = outObject; + } else { + object = (T) hkryo.readClassAndObject(hkryo.input); + } + hkryo.input.nextChunks(); + + hkryo.input.close(); + return object; + } + }); + } + + /** + * Reads an object, from input, into a given object, + * allowing object reuse. + * + * @param input Input stream + * @param object Object to fill from input + */ + private void readIntoObject(Input input, Object object) { + Preconditions.checkNotNull(object); + + Class type = object.getClass(); + ReusableFieldSerializer serializer = + getOrCreateReusableSerializer(type); + + serializer.setReadIntoObject(object); + Object result = readObject(input, type, serializer); + + Preconditions.checkState(result == object); + } + + /** + * Write an object to output, in a way that can be read + * using readIntoObject. + * @param output Output stream + * @param object Object to be written + */ + private void writeOutOfObject(Output output, Object object) { + ReusableFieldSerializer serializer = + getOrCreateReusableSerializer(object.getClass()); + writeObject(output, object, serializer); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java new file mode 100644 index 0000000..1e03888 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable; + +/** + * Class which you can extend to get all serialization/deserialization + * done automagically + */ +public abstract class KryoWritable implements KryoIgnoreWritable { + @Override + public final void write(DataOutput out) throws IOException { + HadoopKryo.writeOutOfObject(out, this); + } + + @Override + public final void readFields(DataInput in) throws IOException { + HadoopKryo.readIntoObject(in, this); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java new file mode 100644 index 0000000..0f6e73f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * Generic wrapper object, making any object writable. + * + * Uses Kryo inside for serialization. + * Current configuration is not optimized for performance, + * but Writable interface doesn't allow much room for it. + * + * Note - Java8 lambdas need to implement Serializable to work. + * + * @param Object type + */ +public class KryoWritableWrapper implements Writable { + /** Wrapped object */ + private T object; + + /** + * Create wrapper given an object. + * @param object Object instance + */ + public KryoWritableWrapper(T object) { + this.object = object; + } + + /** + * Creates wrapper initialized with null. + */ + public KryoWritableWrapper() { + } + + /** + * Unwrap the object value + * @return Object value + */ + public T get() { + return object; + } + + /** + * Set wrapped object value + * @param object New object value + */ + public void set(T object) { + this.object = object; + } + + @Override + public void readFields(DataInput in) throws java.io.IOException { + object = HadoopKryo.readClassAndObject(in); + } + + @Override + public void write(DataOutput out) throws IOException { + HadoopKryo.writeClassAndObject(out, object); + } + + /** + * Returns Writable instance, wrapping given object only + * if it is not already writable. + * + * @param object Object to potentially wrap + * @return Writable object holding argument + */ + public static Writable wrapIfNeeded(Object object) { + if (object instanceof Writable) { + return (Writable) object; + } else { + return new KryoWritableWrapper<>(object); + } + } + + /** + * Unwrap Writable object if it was wrapped initially, + * inverse of wrapIfNeeded function. + * @param value Potentially wrapped value + * @return Original unwrapped value + * @param Type of returned object. + */ + public static T unwrapIfNeeded(Writable value) { + if (value instanceof KryoWritableWrapper) { + return ((KryoWritableWrapper) value).get(); + } else { + return (T) value; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java new file mode 100644 index 0000000..cd26be8 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +import java.util.Random; + +/** + * Transient Random class. Seed/state is not kept after + * serializing/deserializing. + * + * Within Blocks Framework - if we initialize Random within the Piece, when + * it's serialzied and copied to all workers and all threads - keeping seed + * would cause same series of random numbers to be generated everywhere. + * + * So this class is safe to be used in Pieces, while using regular Random + * class is forbidden to be serialized. + * Best approach would be to not have Random serialized, and create it on + * workers, where possible. + */ +public class TransientRandom { + /** Instance of random object */ + private final transient Random random = new Random(); + + /** + * Get instance of Random + * @return Random instance + */ + public Random get() { + return random; + } + + /** + * Returns a pseudorandom, uniformly distributed {@code int} value + * between 0 (inclusive) and the specified value (exclusive), drawn from + * this random number generator's sequence. + * + * @param n Given upper limit + * @return pseudorandom integer number in [0, n) range. + */ + public int nextInt(int n) { + return random.nextInt(n); + } + + /** + * Returns the next pseudorandom, uniformly distributed + * {@code double} value between {@code 0.0} and + * {@code 1.0} from this random number generator's sequence. + * + * @return pseudorandom number in [0, 1) + */ + public double nextDouble() { + return random.nextDouble(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java new file mode 100644 index 0000000..58dd78d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo.markers; + +import org.apache.hadoop.io.Writable; + + +/** + * Marker interface, specifying that kryo should serialize it on it's own, + * and ignore actual Writable method implementations. + * + * If you are using HadoopKryo.writeOutOfObject/readIntoObject result is the + * same, and adding it allows wrapping Kryo context into writable context, + * and then wrapping it back into Kryo context. + */ +public interface KryoIgnoreWritable extends Writable { +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java new file mode 100644 index 0000000..66840c5 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo.markers; + + +/** + * Marker interface saying that class should never be serialized, + * that it is code error for it to be tried to be serialized. + */ +public interface NonKryoWritable { +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java new file mode 100644 index 0000000..5fcfa72 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Marker interfaces used by HadoopKryo to special-case serialization. + */ +package org.apache.giraph.writable.kryo.markers; http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java new file mode 100644 index 0000000..a04bb65 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utilities for using Kryo to efficiently serialize objects, + * and integrate with Writable interface. + * HadoopKryo is a main class doing serialization. + */ +package org.apache.giraph.writable.kryo; http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java new file mode 100644 index 0000000..3d66eb7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo.serializers; + +import java.util.Arrays; +import java.util.List; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Special serializer for Arrays.asList() as they can not be + * deserialized in a standard way. + * {@see + * https://groups.google.com/forum/#!msg/kryo-users/2lXTCEOSxA0/gLzIZRtaNCUJ} + */ +public class ArraysAsListSerializer extends Serializer { + @Override + public void write(Kryo kryo, Output output, List object) { + kryo.writeObject(output, object.toArray(new Object[object.size()])); + } + + @Override + public List read(Kryo kryo, Input input, Class type) { + return Arrays.asList(kryo.readObject(input, Object[].class)); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java new file mode 100644 index 0000000..5460692 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo.serializers; + +import java.util.Collections; +import java.util.List; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Special serializer for Collections.nCopies + * + * @param Element type + */ +public class CollectionsNCopiesSerializer extends Serializer> { + @Override + public void write(Kryo kryo, Output output, List object) { + output.writeInt(object.size(), true); + if (object.size() > 0) { + kryo.writeClassAndObject(output, object.get(0)); + } + } + + @Override + public List read(Kryo kryo, Input input, Class> type) { + int size = input.readInt(true); + if (size > 0) { + T object = (T) kryo.readClassAndObject(input); + return Collections.nCopies(size, object); + } else { + return Collections.emptyList(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java new file mode 100644 index 0000000..1967e6a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo.serializers; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.hadoop.io.Writable; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * A custom Serializer that will call the Writable methods defined by the + * object itself to serialize the object, instead of Kryo auto-magically + * serializing + * + * @param Object type, should implement Writable + */ + +public class DirectWritableSerializer + extends Serializer { + + @Override + public void write(Kryo kryo, Output output, T object) { + try { + object.write(new DataOutputStream(output)); + } catch (IOException e) { + throw new RuntimeException( + "DirectWritableSerializer.write calling Writable method of class: " + + object.getClass().getName() + " encountered issues", e); + } + } + + @Override + public T read(Kryo kryo, Input input, Class type) { + try { + T object = create(kryo, input, type); + kryo.reference(object); + object.readFields(new DataInputStream(input)); + + return object; + } catch (IOException e) { + throw new RuntimeException( + "DirectWritableSerializer.read calling Writable method of class: " + + type.getName() + " encountered issues", e); + } + } + + /** + * Used by {@link #read(Kryo, Input, Class)} to create the new object. + * This can be overridden to customize object creation, eg to call a + * constructor with arguments. The default implementation + * uses {@link Kryo#newInstance(Class)}. + * + * @param kryo Kryo object instance + * @param input Input + * @param type Type of the class to create + * @return New instance of wanted type + */ + protected T create(Kryo kryo, Input input, Class type) { + return ReflectionUtils.newInstance(type); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java new file mode 100644 index 0000000..0eb9676 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo.serializers; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.FieldSerializer; + +/** + * Kryo Serializer for Fastutil collection class. + * By default, because they extend boxed collections, are being serialized very + * inefficiently through a lot of temporary object creation. + * + * We are relying that fastutil classes are written to be correctly serialized + * with Java serialization, and have put transient on all array fields and are + * doing custom efficient serialization in writeObject/readObject methods. + * This Serializer then swaps ObjectOutputStream for all default fields with + * FieldSerializer, and then calls appropriate writeObject/readObject methods. + * We are also relying on defaultWriteObject/defaultReadObject being + * effectively called first within those methods + * + * @param Object type + */ +public class FastUtilSerializer extends Serializer { + /** List of all types generated by fastutil */ + private static final String[] PRIMITIVE_TYPES = new String[] { + "Boolean", "Byte", "Short", "Int", "Long", "Float", "Double", "Char", + "Object"}; + /** List of all types used as keys in fastutil */ + private static final String[] PRIMITIVE_KEY_TYPES = new String[] { + "Byte", "Short", "Int", "Long", "Float", "Double", "Char", "Object"}; + + /** Field serializer for this fastutil class */ + private final FieldSerializer fieldSerializer; + + /** Handle to writeObject Method on this fastutil class*/ + private final Method writeMethod; + /** Handle to readObject Method on this fastutil class*/ + private final Method readMethod; + /** Reusable output stream wrapper */ + private final FastUtilSerializer.FastutilKryoObjectOutputStream outputWrapper; + /** Reusable input stream wrapper */ + private final FastUtilSerializer.FastutilKryoObjectInputStream inputWrapper; + + /** + * Creates and initializes new serializer for a given fastutil class. + * @param kryo Kryo instance + * @param type Fastutil class + */ + public FastUtilSerializer(Kryo kryo, Class type) { + fieldSerializer = new FieldSerializer<>(kryo, type); + fieldSerializer.setIgnoreSyntheticFields(false); + + try { + writeMethod = type.getDeclaredMethod( + "writeObject", ObjectOutputStream.class); + writeMethod.setAccessible(true); + readMethod = type.getDeclaredMethod( + "readObject", ObjectInputStream.class); + readMethod.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new RuntimeException( + "Fastutil class " + type + + " doesn't have readObject/writeObject methods", e); + } + + try { + outputWrapper = new FastutilKryoObjectOutputStream(); + inputWrapper = new FastutilKryoObjectInputStream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Register serializer for a given fastutil class. + * @param kryo Kryo instance + * @param fastutilClass Fastutil class + */ + public static void register(Kryo kryo, Class fastutilClass) { + kryo.register(fastutilClass, new FastUtilSerializer<>(kryo, fastutilClass)); + } + + /** + * Registers serializers for all possible fastutil classes. + * + * There are many fastutil classes, so it is recommended to call this + * function at the end, so they fastutil classes don't use up small + * registration numbers. + * + * @param kryo Kryo instance + */ + public static void registerAll(Kryo kryo) { + registerArrayLists(kryo); + registerArrayBigList(kryo); + registerOpenHashSets(kryo); + registerArraySets(kryo); + registerRBTreeSets(kryo); + registerAVLTreeSets(kryo); + registerOpenHashMaps(kryo); + registerRBTreeMaps(kryo); + registerAVLTreeMaps(kryo); + + // Note - HeapPriorityQueues don't extend boxed collection, + // and so they work out of the box correctly + } + + /** + * Register all Fastutil ArrayLists. + * + * @param kryo Kryo instance + */ + public static void registerArrayLists(Kryo kryo) { + registerAll(kryo, singleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_ArrayList", PRIMITIVE_TYPES)); + } + + /** + * Register all Fastutil ArrayBigLists. + * + * @param kryo Kryo instance + */ + public static void registerArrayBigList(Kryo kryo) { + registerAll(kryo, singleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_BigArrayBigList", PRIMITIVE_TYPES)); + } + + /** + * Register all Fastutil OpenHashSets. + * + * @param kryo Kryo instance + */ + public static void registerOpenHashSets(Kryo kryo) { + registerAll(kryo, singleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_OpenHashSet", PRIMITIVE_TYPES)); + } + + /** + * Register all Fastutil ArraySets. + * + * @param kryo Kryo instance + */ + public static void registerArraySets(Kryo kryo) { + registerAll(kryo, singleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_ArraySet", PRIMITIVE_TYPES)); + } + + /** + * Register all Fastutil RBTreeSets. + * + * @param kryo Kryo instance + */ + public static void registerRBTreeSets(Kryo kryo) { + registerAll(kryo, singleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_RBTreeSet", PRIMITIVE_KEY_TYPES)); + } + + /** + * Register all Fastutil AVLTreeSets. + * + * @param kryo Kryo instance + */ + public static void registerAVLTreeSets(Kryo kryo) { + registerAll(kryo, singleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_AVLTreeSet", PRIMITIVE_KEY_TYPES)); + } + + /** + * Register all Fastutil OpenHashMaps. + * + * @param kryo Kryo instance + */ + public static void registerOpenHashMaps(Kryo kryo) { + registerAll(kryo, doubleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_2_T2_OpenHashMap", + PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES)); + } + + /** + * Register all Fastutil RBTreeMaps. + * + * @param kryo Kryo instance + */ + public static void registerRBTreeMaps(Kryo kryo) { + registerAll(kryo, doubleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_2_T2_RBTreeMap", + PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES)); + } + + /** + * Register all Fastutil AVLTreeMaps. + * + * @param kryo Kryo instance + */ + public static void registerAVLTreeMaps(Kryo kryo) { + registerAll(kryo, doubleTypes( + "it.unimi.dsi.fastutil._t1_s._T1_2_T2_AVLTreeMap", + PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES)); + } + + /** + * Register all class from the list of classes. + * + * @param kryo Kryo instance + * @param types List of classes + */ + private static void registerAll(Kryo kryo, ArrayList> types) { + for (Class type : types) { + register(kryo, type); + } + } + + /** + * Returns list of all classes that are generated by using given + * pattern, and replacing it with passed list of types. + * Pattern contains _t1_ and _T1_, for lowercase and actual name. + * + * @param pattern Given pattern + * @param types Given list of strings to replace into pattern + * @return List of all classes + */ + private static ArrayList> singleTypes( + String pattern, String[] types) { + ArrayList> result = new ArrayList<>(); + + for (String type : types) { + try { + result.add(Class.forName( + pattern.replaceAll("_T1_", type).replaceAll( + "_t1_", type.toLowerCase()))); + } catch (ClassNotFoundException e) { + throw new RuntimeException(pattern + " " + type, e); + } + } + return result; + } + + /** + * Returns list of all classes that are generated by using given + * pattern, and replacing it with passed list of types. + * Pattern contains two variable pairs: _t1_, _T1_ and _t2_, _T2_, + * in each pair one for lowercase and one for actual name. + * + * @param pattern Given pattern + * @param types1 Given list of strings to replace t1 into pattern + * @param types2 Given list of strings to replace t2 into pattern + * @return List of all classes + */ + private static ArrayList> doubleTypes( + String pattern, String[] types1, String[] types2) { + ArrayList> result = new ArrayList<>(); + + for (String type1 : types1) { + for (String type2 : types2) { + try { + result.add(Class.forName( + pattern.replaceAll("_T1_", type1).replaceAll( + "_t1_", type1.toLowerCase()) + .replaceAll("_T2_", type2).replaceAll( + "_t2_", type2.toLowerCase()))); + } catch (ClassNotFoundException e) { + throw new RuntimeException(pattern + " " + type1 + " " + type2, e); + } + } + } + return result; + } + + @Override + public void write(Kryo kryo, Output output, T object) { + fieldSerializer.write(kryo, output, object); + + outputWrapper.set(output, kryo); + try { + writeMethod.invoke(object, outputWrapper); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("writeObject failed", e); + } + } + + @Override + public T read(Kryo kryo, Input input, Class type) { + T result = fieldSerializer.read(kryo, input, type); + + if (result != null) { + inputWrapper.set(input, kryo); + try { + readMethod.invoke(result, inputWrapper); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("readObject failed", e); + } + } + + return result; + } + + /** + * Wrapper around ObjectOutputStream that ignores defaultWriteObject (assumes + * that needed logic was already executed before), and passes all other calls + * to Output + */ + private static class FastutilKryoObjectOutputStream + extends ObjectOutputStream { + /** Output */ + private Output output; + /** Kryo */ + private Kryo kryo; + + /** Constructor */ + FastutilKryoObjectOutputStream() throws IOException { + super(); + } + + /** + * Setter + * + * @param output Output + * @param kryo kryo + */ + public void set(Output output, Kryo kryo) { + this.output = output; + this.kryo = kryo; + } + + @Override + public void defaultWriteObject() throws IOException { + } + + @Override + public void writeBoolean(boolean val) throws IOException { + output.writeBoolean(val); + } + + @Override + public void writeByte(int val) throws IOException { + output.writeByte(val); + } + + @Override + public void writeShort(int val) throws IOException { + output.writeShort(val); + } + + @Override + public void writeChar(int val) throws IOException { + output.writeChar((char) val); + } + + @Override + public void writeInt(int val) throws IOException { + output.writeInt(val, false); + } + + @Override + public void writeLong(long val) throws IOException { + output.writeLong(val, false); + } + + @Override + public void writeFloat(float val) throws IOException { + output.writeFloat(val); + } + + @Override + public void writeDouble(double val) throws IOException { + output.writeDouble(val); + } + + @Override + protected void writeObjectOverride(Object obj) throws IOException { + kryo.writeClassAndObject(output, obj); + } + } + + /** + * Wrapper around ObjectOutputStream that ignores defaultReadObject + * (assumes that needed logic was already executed before), and passes + * all other calls to Output + */ + private static class FastutilKryoObjectInputStream extends ObjectInputStream { + /** Input */ + private Input input; + /** Kryo */ + private Kryo kryo; + + /** Constructor */ + FastutilKryoObjectInputStream() throws IOException { + super(); + } + + /** + * Setter + * + * @param input Input + * @param kryo Kryo + */ + public void set(Input input, Kryo kryo) { + this.input = input; + this.kryo = kryo; + } + + @Override + public void defaultReadObject() throws IOException, ClassNotFoundException { + } + + @Override + public boolean readBoolean() throws IOException { + return input.readBoolean(); + } + + @Override + public byte readByte() throws IOException { + return input.readByte(); + } + + @Override + public char readChar() throws IOException { + return input.readChar(); + } + + @Override + public short readShort() throws IOException { + return input.readShort(); + } + + @Override + public int readInt() throws IOException { + return input.readInt(false); + } + + @Override + public long readLong() throws IOException { + return input.readLong(false); + } + + @Override + public float readFloat() throws IOException { + return input.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return input.readDouble(); + } + + @Override + protected Object readObjectOverride() + throws IOException, ClassNotFoundException { + return kryo.readClassAndObject(input); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java new file mode 100644 index 0000000..f11139d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo.serializers; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; + +/** + * Serializer used to deserialize data into object, instead of creating a new + * value readIntoObject needs to be set right before deserialization is called + * on it. + * + * @param Object type + */ +public class ReusableFieldSerializer extends FieldSerializer { + /** Current object into which to deserialize */ + private T readIntoObject; + + /** + * Creates new reusable field serializer for a given type. + * @param kryo HadoopKryo object + * @param type Type of object + */ + public ReusableFieldSerializer(Kryo kryo, Class type) { + super(kryo, type); + } + + public void setReadIntoObject(T value) { + this.readIntoObject = value; + } + + @Override + protected T create(Kryo kryo, Input input, Class type) { + Preconditions.checkNotNull(readIntoObject); + Preconditions.checkState(readIntoObject.getClass().equals(type)); + T toReturn = readIntoObject; + readIntoObject = null; + return toReturn; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java new file mode 100644 index 0000000..9eabf96 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Implementations of custom serializers needed for HadoopKryo + */ +package org.apache.giraph.writable.kryo.serializers; http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java new file mode 100644 index 0000000..3898b82 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +import static org.junit.Assert.assertEquals; +import it.unimi.dsi.fastutil.longs.LongArrayList; + +import java.util.Arrays; +import java.util.List; + +import org.apache.giraph.types.ops.collections.BasicArrayList.BasicLongArrayList; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.LongWritable; +import org.junit.Assert; +import org.junit.Test; + + + +/** + * Tests some subtle cases of kryo serialization. + */ +public class KryoWritableTest { + public static class TestClassA extends KryoWritable { + final String testObject; + final List list; + final int something; + + public TestClassA(String testObject, List list, int something) { + this.testObject = testObject; + this.list = list; + this.something = something; + } + + public TestClassA() { + this.testObject = null; + this.list = null; + this.something = -1; + } + } + + @Test + public void testTestClassA() throws Exception { + String testObject = "Hello World!"; + TestClassA res = new TestClassA(); + WritableUtils.copyInto( + new TestClassA(testObject, Arrays.asList(1, 2, 3), 5), res, true); + + assertEquals(testObject, res.testObject); + + assertEquals(3, res.list.size()); + assertEquals(1, res.list.get(0)); + assertEquals(2, res.list.get(1)); + assertEquals(3, res.list.get(2)); + + assertEquals(5, res.something); + } + + public static class LongKryoWritable extends KryoWritable { + private long value; + + public LongKryoWritable(long value) { + this.value = value; + } + + public long get() { + return value; + } + + public void set(long value) { + this.value = value; + } + } + + + int multiplier = 5000; // use 5000 for profiling + int longTestTimes = 1000 * multiplier; + + @Test + public void testLongKryoWritable() throws Exception { + LongKryoWritable from = new LongKryoWritable(0); + LongKryoWritable to = new LongKryoWritable(0); + + for (int i = 0; i < longTestTimes; i++) { + from.set(i); + WritableUtils.copyInto(from, to, true); + assertEquals(i, to.get()); + } + } + + @Test + public void testLongWritable() throws Exception { + LongWritable from = new LongWritable(0); + LongWritable to = new LongWritable(0); + + for (int i = 0; i < longTestTimes; i++) { + from.set(i); + WritableUtils.copyInto(from, to, true); + assertEquals(i, to.get()); + } + } + + public static class LongListKryoWritable extends KryoWritable { + public LongArrayList value; + + public LongListKryoWritable(LongArrayList value) { + this.value = value; + } + } + + int longListTestTimes = 1 * multiplier; + int longListTestSize = 100000; + + @Test + public void testLongListKryoWritable() throws Exception { + LongArrayList list = new LongArrayList(longListTestSize); + for (int i = 0; i < longListTestSize; i++) { + list.add(i); + } + + LongListKryoWritable from = new LongListKryoWritable(list); + LongListKryoWritable to = new LongListKryoWritable(null); + + for (int i = 0; i < longListTestTimes; i++) { + from.value.set((2 * i) % longListTestSize, 0); + WritableUtils.copyInto(from, to, true); + } + } + + @Test + public void testLongListWritable() throws Exception { + BasicLongArrayList from = new BasicLongArrayList(longListTestSize); + LongWritable value = new LongWritable(); + for (int i = 0; i < longListTestSize; i++) { + value.set(i); + from.add(value); + } + + BasicLongArrayList to = new BasicLongArrayList(longListTestSize); + value.set(0); + + for (int i = 0; i < longListTestTimes; i++) { + from.set((2 * i) % longListTestSize, value); + WritableUtils.copyInto(from, to, true); + } + } + + public static class NestedKryoWritable extends KryoWritable { + public LongKryoWritable value1; + public T value2; + + public NestedKryoWritable(LongKryoWritable value1, T value2) { + this.value1 = value1; + this.value2 = value2; + } + } + + @Test + public void testNestedKryoWritable() throws Exception { + LongKryoWritable inner = new LongKryoWritable(5); + NestedKryoWritable res = new NestedKryoWritable<>(null, null); + WritableUtils.copyInto( + new NestedKryoWritable<>(inner, inner), res, true); + + assertEquals(5, res.value1.get()); + Assert.assertTrue(res.value1 == res.value2); + } + + @Test + public void testRecursiveKryoWritable() throws Exception { + LongKryoWritable inner = new LongKryoWritable(5); + NestedKryoWritable wanted = new NestedKryoWritable<>(inner, null); + wanted.value2 = wanted; + + NestedKryoWritable res = new NestedKryoWritable<>(null, null); + WritableUtils.copyInto(wanted, res, true); + + assertEquals(5, res.value1.get()); + Assert.assertTrue(res == res.value2); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java new file mode 100644 index 0000000..2291f16 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import it.unimi.dsi.fastutil.chars.Char2ObjectMap; +import it.unimi.dsi.fastutil.chars.Char2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.floats.FloatArrayList; +import it.unimi.dsi.fastutil.ints.Int2BooleanMap; +import it.unimi.dsi.fastutil.ints.Int2BooleanOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConfigurationSettable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.writable.kryo.markers.NonKryoWritable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; + + + +/** + * Tests some subtle cases of kryo serialization. + */ +public class KryoWritableWrapperTest { + public static T kryoSerDeser(T t) throws IOException { + KryoWritableWrapper wrapped = new KryoWritableWrapper<>(t); + KryoWritableWrapper deser = new KryoWritableWrapper<>(); + WritableUtils.copyInto(wrapped, deser, true); + return deser.get(); + } + + @Test + public void testArraysAsList() throws IOException { + List res = kryoSerDeser(Arrays.asList(1, 2, 3)); + + assertEquals(3, res.size()); + assertEquals(1, res.get(0)); + assertEquals(2, res.get(1)); + assertEquals(3, res.get(2)); + } + + + @Test + public void testArraysAsListMultiRef() throws IOException { + List list = Arrays.asList(1, 2, 3); + Object obj = new Object(); + List wanted = Arrays.asList(list, list, obj, obj, null); + wanted.set(4, wanted); + List res = kryoSerDeser(wanted); + + assertTrue(res.get(0) == res.get(1)); + assertTrue(res.get(2) == res.get(3)); + // TODO see if this can be supported, though this is a rare case: + // assertTrue(res == res.get(4)); + } + + @Test + public void testCollectionsNCopiesList() throws IOException { + List res = kryoSerDeser(Collections.nCopies(3, 42)); + + assertEquals(3, res.size()); + assertEquals(42, res.get(0)); + assertEquals(42, res.get(1)); + assertEquals(42, res.get(2)); + } + + @Test + public void testCollectionsNCopiesObjectList() throws IOException { + String testObject = "Hello World!"; + List res = kryoSerDeser(Collections.nCopies(3, testObject)); + + assertEquals(3, res.size()); + assertEquals(testObject, res.get(0)); + assertEquals(testObject, res.get(1)); + assertEquals(testObject, res.get(2)); + } + + @Test + public void testUnmodifiableIterator() throws IOException { + Iterator in = Iterables.concat( + Arrays.asList(0, 1), + Arrays.asList(2, 3), + Arrays.asList(4, 5)).iterator(); + + in.next(); + in.next(); + in.next(); + Iterator res = kryoSerDeser(in); + + int cnt = 3; + for(; res.hasNext(); cnt++) { + assertEquals(cnt, res.next()); + } + assertEquals(6, cnt); + } + + @Test + public void testIteratorsConcat() throws IOException { + Iterator in = Iterators.concat( + Arrays.asList(0, 1).iterator(), + Arrays.asList(2, 3).iterator(), + Arrays.asList(4, 5).iterator()); + + in.next(); + in.next(); + in.next(); + + Iterator res = kryoSerDeser(in); + + int cnt = 3; + for(; res.hasNext(); cnt++) { + assertEquals(cnt, res.next()); + } + assertEquals(6, cnt); + + } + + @Test + public void testImmutableList() throws IOException { + { + List res = kryoSerDeser(ImmutableList.of(1, 2)); + assertEquals(2, res.size()); + assertEquals(1, res.get(0)); + assertEquals(2, res.get(1)); + } + + { + List list = ImmutableList.of(1, 2, 3); + Object obj = new Object(); + List wanted = ImmutableList.of(list, list, obj, obj); + List res = kryoSerDeser(wanted); + + assertTrue(res.get(0) == res.get(1)); + assertTrue(res.get(2) == res.get(3)); + } + } + + @Test + public void testFastutilSet() throws ClassNotFoundException, IOException { + LongOpenHashSet set = new LongOpenHashSet(); + set.add(6); + LongOpenHashSet deser = kryoSerDeser(set); + deser.add(5); + set.add(5); + Assert.assertEquals(set, deser); + } + + @Test + public void testFastutilLongList() throws ClassNotFoundException, IOException { + LongArrayList list = new LongArrayList(); + list.add(6); + LongArrayList deser = kryoSerDeser(list); + deser.add(5); + list.add(5); + Assert.assertEquals(list, deser); + } + + @Test + public void testFastutilFloatList() throws ClassNotFoundException, IOException { + FloatArrayList list = new FloatArrayList(); + list.add(6L); + FloatArrayList deser = kryoSerDeser(list); + deser.add(5L); + list.add(5L); + Assert.assertEquals(list, deser); + } + + @Test + public void testFastutilMap() throws ClassNotFoundException, IOException { + Int2BooleanMap list = new Int2BooleanOpenHashMap(); + list.put(6, true); + Int2BooleanMap deser = kryoSerDeser(list); + deser.put(5, false); + list.put(5, false); + Assert.assertEquals(list, deser); + } + + @Test + public void testFastutil2ObjMap() throws ClassNotFoundException, IOException { + Char2ObjectMap list = new Char2ObjectOpenHashMap<>(); + list.put('a', new IntWritable(6)); + list.put('q', new IntWritable(7)); + list.put('w', new IntWritable(8)); + list.put('e', new IntWritable(9)); + list.put('r', new IntWritable(7)); + list.put('c', null); + Char2ObjectMap deser = kryoSerDeser(list); + deser.put('b', null); + list.put('b', null); + + Assert.assertEquals(list, deser); + } + + @Test + @Ignore("Long test used for profiling compiling speed") + public void testLongFastutilListProfile() throws ClassNotFoundException, IOException { + int n = 100; + int rounds = 2000000; + LongArrayList list = new LongArrayList(n); + for (int i = 0; i < n; i++) { + list.add(i); + } + + for (int round = 0; round < rounds; round ++) { + LongArrayList deser = kryoSerDeser(list); + deser.add(round); + list.add(round); + Assert.assertEquals(list.size(), deser.size()); + Assert.assertArrayEquals(list.elements(), deser.elements()); + + list.popLong(); + } + } + + @Test(expected=RuntimeException.class) + public void testRandom() throws ClassNotFoundException, IOException { + kryoSerDeser(new Random()).nextBoolean(); + } + + private static class TestConf implements GiraphConfigurationSettable { + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + } + } + + @Test(expected=RuntimeException.class) + public void testConfiguration() throws ClassNotFoundException, IOException { + kryoSerDeser(new Configuration()); + } + + @Test(expected=RuntimeException.class) + public void testConfigurable() throws ClassNotFoundException, IOException { + kryoSerDeser(new TestConf()); + } + + @Test(expected=RuntimeException.class) + public void testVertexReceiver() throws ClassNotFoundException, IOException { + kryoSerDeser(new NonKryoWritable() { + }); + } + + + @Test + public void testBlacklistedClasses() throws ClassNotFoundException, IOException { + Assert.assertEquals(kryoSerDeser(Random.class), Random.class); + Assert.assertEquals(kryoSerDeser(TestConf.class), TestConf.class); + Assert.assertEquals(kryoSerDeser(GiraphConfiguration.class), GiraphConfiguration.class); + } + + @Test(expected=RuntimeException.class) + public void testRecursive() throws ClassNotFoundException, IOException { + kryoSerDeser(new KryoWritableWrapper<>(new Object())).get().hashCode(); + } + + @Test + public void testNull() throws ClassNotFoundException, IOException { + Assert.assertNull(kryoSerDeser(null)); + } +}