giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ikabi...@apache.org
Subject [2/2] git commit: updated refs/heads/trunk to 06a1084
Date Tue, 09 Jun 2015 16:17:32 GMT
[GIRAPH-1010] Add utilities to allow Kryo to serialize objects

Test Plan: mvn clean install

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D39513


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/06a1084a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/06a1084a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/06a1084a

Branch: refs/heads/trunk
Commit: 06a1084af99ee46a03ebbbee53e3224ca8862132
Parents: 27661ba
Author: Igor Kabiljo <ikabiljo@fb.com>
Authored: Wed Jun 3 13:35:20 2015 -0700
Committer: Igor Kabiljo <ikabiljo@fb.com>
Committed: Tue Jun 9 09:17:06 2015 -0700

----------------------------------------------------------------------
 giraph-core/pom.xml                             |  12 +
 .../org/apache/giraph/utils/WritableUtils.java  | 126 +++--
 .../writable/kryo/DataInputWrapperStream.java   |  59 +++
 .../writable/kryo/DataOutputWrapperStream.java  |  47 ++
 .../apache/giraph/writable/kryo/HadoopKryo.java | 435 +++++++++++++++++
 .../giraph/writable/kryo/KryoWritable.java      |  40 ++
 .../writable/kryo/KryoWritableWrapper.java      | 110 +++++
 .../giraph/writable/kryo/TransientRandom.java   |  69 +++
 .../kryo/markers/KryoIgnoreWritable.java        |  32 ++
 .../writable/kryo/markers/NonKryoWritable.java  |  26 +
 .../writable/kryo/markers/package-info.java     |  21 +
 .../giraph/writable/kryo/package-info.java      |  23 +
 .../serializers/ArraysAsListSerializer.java     |  44 ++
 .../CollectionsNCopiesSerializer.java           |  52 ++
 .../serializers/DirectWritableSerializer.java   |  83 ++++
 .../kryo/serializers/FastUtilSerializer.java    | 476 +++++++++++++++++++
 .../serializers/ReusableFieldSerializer.java    |  57 +++
 .../writable/kryo/serializers/package-info.java |  21 +
 .../giraph/writable/kryo/KryoWritableTest.java  | 195 ++++++++
 .../writable/kryo/KryoWritableWrapperTest.java  | 292 ++++++++++++
 pom.xml                                         |  33 ++
 21 files changed, 2217 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 6fea1a3..4719a5d 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -542,6 +542,18 @@ under the License.
       <groupId>org.python</groupId>
       <artifactId>jython</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>de.javakaffee</groupId>
+      <artifactId>kryo-serializers</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.objenesis</groupId>
+      <artifactId>objenesis</artifactId>
+    </dependency>
 
     <!-- runtime dependency -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index be2ef9d..68ed89a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -748,42 +748,6 @@ public class WritableUtils {
   }
 
   /**
-   * Create a copy of Writable object, by serializing and deserializing it.
-   *
-   * @param reusableOut Reusable output stream to serialize into
-   * @param reusableIn Reusable input stream to deserialize out of
-   * @param original Original value of which to make a copy
-   * @param conf Configuration
-   * @param <T> Type of the object
-   * @return Copy of the original value
-   */
-  public static <T extends Writable> T createCopy(
-      UnsafeByteArrayOutputStream reusableOut,
-      UnsafeReusableByteArrayInput reusableIn, T original,
-      ImmutableClassesGiraphConfiguration conf) {
-    T copy = (T) createWritable(original.getClass(), conf);
-
-    try {
-      reusableOut.reset();
-      original.write(reusableOut);
-      reusableIn.initialize(
-          reusableOut.getByteArray(), 0, reusableOut.getPos());
-      copy.readFields(reusableIn);
-
-      if (reusableIn.available() != 0) {
-        throw new RuntimeException("Serialization of " +
-            original.getClass() + " encountered issues, " +
-            reusableIn.available() + " bytes left to be read");
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "IOException occurred while trying to create a copy " +
-          original.getClass(), e);
-    }
-    return copy;
-  }
-
-  /**
    * Writes primitive int array of ints into output stream.
    * Array can be null or empty.
    * @param array array to be written
@@ -896,4 +860,94 @@ public class WritableUtils {
       return null;
     }
   }
+
+
+  /**
+   * Copy {@code from} into {@code to}, by serializing and deserializing it.
+   * Since it is creating streams inside, it's mostly useful for
+   * tests/non-performant code.
+   *
+   * @param from Object to copy from
+   * @param to Object to copy into
+   * @param <T> Type of the object
+   */
+  public static <T extends Writable> void copyInto(T from, T to) {
+    copyInto(from, to, false);
+  }
+
+  /**
+   * Copy {@code from} into {@code to}, by serializing and deserializing it.
+   * Since it is creating streams inside, it's mostly useful for
+   * tests/non-performant code.
+   *
+   * @param from Object to copy from
+   * @param to Object to copy into
+   * @param checkOverRead if true, will add one more byte at the end of writing,
+   *                      to make sure read is not touching it. Useful for tests
+   * @param <T> Type of the object
+   */
+  public static <T extends Writable> void copyInto(
+      T from, T to, boolean checkOverRead) {
+    try {
+      if (from.getClass() != to.getClass()) {
+        throw new RuntimeException(
+            "Trying to copy from " + from.getClass() +
+            " into " + to.getClass());
+      }
+
+      UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+      from.write(out);
+      if (checkOverRead) {
+        out.writeByte(0);
+      }
+
+      UnsafeByteArrayInputStream in =
+          new UnsafeByteArrayInputStream(out.getByteArray(), 0, out.getPos());
+      to.readFields(in);
+
+      if (in.available() != (checkOverRead ? 1 : 0)) {
+        throw new RuntimeException(
+            "Serialization encountered issues with " + from.getClass() + ", " +
+            (in.available() - (checkOverRead ? 1 : 0)) + " fewer bytes read");
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Create a copy of Writable object, by serializing and deserializing it.
+   *
+   * @param reusableOut Reusable output stream to serialize into
+   * @param reusableIn Reusable input stream to deserialize out of
+   * @param original Original value of which to make a copy
+   * @param conf Configuration
+   * @param <T> Type of the object
+   * @return Copy of the original value
+   */
+  public static <T extends Writable> T createCopy(
+      UnsafeByteArrayOutputStream reusableOut,
+      UnsafeReusableByteArrayInput reusableIn, T original,
+      ImmutableClassesGiraphConfiguration conf) {
+    T copy = (T) createWritable(original.getClass(), conf);
+
+    try {
+      reusableOut.reset();
+      original.write(reusableOut);
+      reusableIn.initialize(
+          reusableOut.getByteArray(), 0, reusableOut.getPos());
+      copy.readFields(reusableIn);
+
+      if (reusableIn.available() != 0) {
+        throw new RuntimeException("Serialization of " +
+            original.getClass() + " encountered issues, " +
+            reusableIn.available() + " bytes left to be read");
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "IOException occurred while trying to create a copy " +
+          original.getClass(), e);
+    }
+    return copy;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java
new file mode 100644
index 0000000..e0e23b0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Thin wrapper around DataInput so it can be used as an
+ * {@link java.io.InputStream}
+ *
+ * For use with {@link com.esotericsoftware.kryo.io.Input}
+ */
+public class DataInputWrapperStream extends InputStream {
+  /** Wrapped DataInput object */
+  private DataInput in;
+
+  public void setDataInput(DataInput in) {
+    this.in = in;
+  }
+
+  @Override
+  public int read() throws IOException {
+    try {
+      return in.readByte() & 0xFF;
+    } catch (EOFException e) {
+      throw new RuntimeException(
+          "Chunked input should never read more than chunked output wrote", e);
+    }
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    try {
+      in.readFully(b, off, len);
+      return len;
+    } catch (EOFException e) {
+      throw new RuntimeException(
+          "Chunked input should never read more than chunked output wrote", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java
new file mode 100644
index 0000000..3518f67
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Thin wrapper around a DataOutput so it can be used as an
+ * {@link java.io.OutputStream}
+ *
+ * For use with {@link com.esotericsoftware.kryo.io.Output})
+ */
+public class DataOutputWrapperStream extends OutputStream {
+  /** Wrapped DataOutput object */
+  private DataOutput out;
+
+  public void setDataOutput(DataOutput out) {
+    this.out = out;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    out.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    out.write(b, off, len);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
new file mode 100644
index 0000000..b4f2bfa
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.BasicSet;
+import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
+import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
+import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
+import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
+import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.factories.SerializerFactory;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.InputChunked;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.io.OutputChunked;
+import com.esotericsoftware.kryo.pool.KryoCallback;
+import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import com.esotericsoftware.kryo.serializers.ClosureSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import com.google.common.base.Preconditions;
+
+import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
+
+/**
+ * Kryo instance that provides serialization through DataInput/DataOutput
+ * that org.apache.hadoop.io.Writable uses.
+ *
+ * All public APIs are static.
+ *
+ * It extends Kryo to reuse KryoPool functionality, but have additional needed
+ * objects cached as well. If we move to ThreadLocal or other caching
+ * technique, we can use composition, instead of inheritance here.
+ */
+public class HadoopKryo extends Kryo {
+  /** Pool of reusable Kryo objects, since they are expensive to create */
+  private static final KryoPool KRYO_POOL = new KryoPool.Builder(
+      new KryoFactory() {
+        @Override
+        public Kryo create() {
+          return createKryo();
+        }
+      }).build();
+
+  /**
+   * List of interfaces/parent classes that will not be allowed to be
+   * serialized, together with explanation of why, that will be shown
+   * when throwing such exception
+   */
+  private static final Map<Class<?>, String> NON_SERIALIZABLE;
+
+  static {
+    NON_SERIALIZABLE = new LinkedHashMap<>();
+    NON_SERIALIZABLE.put(
+        NonKryoWritable.class,
+        "it is marked to not allow serialization, " +
+        "look at the class for more details");
+    NON_SERIALIZABLE.put(
+        KryoWritableWrapper.class, "recursion is dissallowed");
+    NON_SERIALIZABLE.put(
+        Configuration.class,
+        "it cannot be supported since it contains ClassLoader");
+    NON_SERIALIZABLE.put(
+        GiraphConfigurationSettable.class, "configuration cannot be set");
+    NON_SERIALIZABLE.put(
+        Configurable.class, "configuration cannot be set");
+    NON_SERIALIZABLE.put(
+        Random.class,
+        "it should be rarely serialized, since it would create same stream " +
+        "of numbers everywhere, use TransientRandom instead");
+  }
+
+  // Use chunked streams, so within same stream we can use both kryo and
+  // non-kryo serialization.
+  /** Reusable Input object */
+  private final InputChunked input = new InputChunked(4096);
+  /** Reusable Output object */
+  private final OutputChunked output = new OutputChunked(4096);
+
+  /** Reusable DataInput wrapper stream */
+  private final DataInputWrapperStream dataInputWrapperStream =
+      new DataInputWrapperStream();
+  /** Reusable DataOutput wrapper stream */
+  private final DataOutputWrapperStream dataOutputWrapperStream =
+      new DataOutputWrapperStream();
+
+  /**
+   * Map of already initialized serializers used
+   * for readIntoObject/writeOutOfObject pair of methods
+   */
+  private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
+  classToIntoSerializer = new ObjectMap<>();
+
+  /** Hide constructor, so all access go through pool of cached objects */
+  private HadoopKryo() {
+  }
+
+  // Public API:
+
+  /**
+   * Write type of given object and the object itself to the output stream.
+   * Inverse of readClassAndObject.
+   *
+   * @param out Output stream
+   * @param object Object to write
+   */
+  public static void writeClassAndObject(
+      final DataOutput out, final Object object) {
+    writeInternal(out, object, false);
+  }
+
+  /**
+   * Read object from the input stream, by reading first type of the object,
+   * and then all of it's fields.
+   * Inverse of writeClassAndObject.
+   *
+   * @param in Input stream
+   * @return Deserialized object
+   * @param <T> Type of the object being read
+   */
+  public static <T> T readClassAndObject(DataInput in) {
+    return readInternal(in, null, false);
+  }
+
+  /**
+   * Write an object to output, in a way that can be read by readIntoObject.
+   *
+   * @param out Output stream
+   * @param object Object to be written
+   */
+  public static void writeOutOfObject(
+      final DataOutput out, final Object object) {
+    writeInternal(out, object, true);
+  }
+
+  /**
+   * Reads an object, from input, into a given object,
+   * allowing object reuse.
+   * Inverse of writeOutOfObject.
+   *
+   * @param in Input stream
+   * @param object Object to fill from input
+   */
+  public static void readIntoObject(DataInput in, Object object) {
+    readInternal(in, object, true);
+  }
+
+  /**
+   * Create copy of the object, by magically recursively copying
+   * all of it's fields, keeping reference structures (like cycles)
+   *
+   * @param object Object to be copied
+   * @return Copy of the object.
+   * @param <T> Type of the object
+   */
+  public static <T> T createCopy(final T object) {
+    return KRYO_POOL.run(new KryoCallback<T>() {
+      @Override
+      public T execute(Kryo kryo) {
+        return kryo.copy(object);
+      }
+    });
+  }
+
+  // Private implementation:
+
+  /**
+   * Create new instance of HadoopKryo, properly initialized.
+   *
+   * @return New HadoopKryo instnace
+   */
+  private static HadoopKryo createKryo() {
+    HadoopKryo kryo = new HadoopKryo();
+
+    String version = System.getProperty("java.version");
+    char minor = version.charAt(2);
+    if (minor >= '8') {
+      try {
+        kryo.register(Class.forName("java.lang.invoke.SerializedLambda"));
+        kryo.register(Class.forName("com.esotericsoftware.kryo.Kryo$Closure"),
+            new ClosureSerializer());
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException(
+            "Trying to use Kryo on >= Java 8 (" + version +
+            "), but unable to find needed classes", e);
+      }
+    }
+
+    kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
+    kryo.register(Collections.nCopies(1, new Object()).getClass(),
+        new CollectionsNCopiesSerializer());
+
+    ImmutableListSerializer.registerSerializers(kryo);
+
+    // TODO move guava version to 18.0, and remove this fix:
+    try {
+      kryo.register(
+          Class.forName("com.google.common.collect.RegularImmutableList"),
+          new ImmutableListSerializer());
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(
+          "Guava has RegularImmutableList missing", e);
+    }
+
+    // There are many fastutil classes, register them at the end,
+    // so they don't use up small registration numbers
+    FastUtilSerializer.registerAll(kryo);
+
+    kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
+        new StdInstantiatorStrategy()));
+
+    kryo.setDefaultSerializer(new SerializerFactory() {
+      @SuppressWarnings("rawtypes")
+      @Override
+      public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
+        for (final Entry<Class<?>, String> entry :
+            NON_SERIALIZABLE.entrySet()) {
+          if (entry.getKey().isAssignableFrom(type)) {
+            // Allow Class object to be serialized, but not a live instance.
+            return new Serializer() {
+              @Override
+              public Object read(Kryo kryo, Input input, Class type) {
+                throw new RuntimeException("Cannot serialize " + type +
+                    ". Objects being serialized cannot capture " +
+                    entry.getKey() + " because " + entry.getValue() +
+                    ". Either remove field in question" +
+                    ", or make it transient (so that it isn't serialized)");
+              }
+
+              @Override
+              public void write(Kryo kryo, Output output, Object object) {
+                throw new RuntimeException("Cannot serialize " + type +
+                    ". Objects being serialized cannot capture " +
+                    entry.getKey() + " because " + entry.getValue() +
+                    ". Either remove field in question" +
+                    ", or make it transient (so that it isn't serialized)");
+              }
+            };
+          }
+        }
+
+        if (Writable.class.isAssignableFrom(type) &&
+            !KryoIgnoreWritable.class.isAssignableFrom(type) &&
+            // remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily,
+            // for lack of constructors
+            !BasicSet.class.isAssignableFrom(type) &&
+            !BasicArrayList.class.isAssignableFrom(type) &&
+            !Basic2ObjectMap.class.isAssignableFrom(type)) {
+          // use the Writable method defined by the type
+          DirectWritableSerializer serializer = new DirectWritableSerializer();
+          return serializer;
+        } else {
+          FieldSerializer serializer = new FieldSerializer<>(kryo, type);
+          serializer.setIgnoreSyntheticFields(false);
+          return serializer;
+        }
+      }
+    });
+
+    return kryo;
+  }
+
+  /**
+   * Initialize reusable objects for reading from given DataInput.
+   *
+   * @param in Input stream
+   */
+  private void setDataInput(DataInput in) {
+    dataInputWrapperStream.setDataInput(in);
+    input.setInputStream(dataInputWrapperStream);
+  }
+
+  /**
+   * Initialize reusable objects for writing into given DataOutput.
+   *
+   *  @param out Output stream
+   */
+  private void setDataOutput(DataOutput out) {
+    dataOutputWrapperStream.setDataOutput(out);
+    output.setOutputStream(dataOutputWrapperStream);
+  }
+
+  /**
+   * Get or create reusable serializer for given class.
+   *
+   * @param type Type of the object
+   * @return Serializer
+   */
+  private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
+      Class<?> type) {
+    ReusableFieldSerializer<Object> serializer =
+        classToIntoSerializer.get(type);
+    if (serializer == null) {
+      serializer = new ReusableFieldSerializer<>(this, type);
+      classToIntoSerializer.put(type, serializer);
+    }
+    return serializer;
+  }
+
+  /**
+   * Internal write implementation, that reuses HadoopKryo objects
+   * from the pool.
+   *
+   * @param out Output stream
+   * @param object Object to be written
+   * @param outOf whether we are writing reusable objects,
+   *              or full objects with class name
+   */
+  private static void writeInternal(
+      final DataOutput out, final Object object, final boolean outOf) {
+    KRYO_POOL.run(new KryoCallback<Void>() {
+      @Override
+      public Void execute(Kryo kryo) {
+        HadoopKryo hkryo = (HadoopKryo) kryo;
+        hkryo.setDataOutput(out);
+
+        if (outOf) {
+          hkryo.writeOutOfObject(hkryo.output, object);
+        } else {
+          hkryo.writeClassAndObject(hkryo.output, object);
+        }
+
+        hkryo.output.endChunks();
+        hkryo.output.close();
+
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Internal read implementation, that reuses HadoopKryo objects
+   * from the pool.
+   *
+   * @param in Input stream
+   * @param outObject Object to fill from input (if not null)
+   * @param into whether we are reading reusable objects,
+   *             or full objects with class name
+   * @return Read object (new one, or same passed in if we use reusable)
+   * @param <T> Type of the object to read
+   */
+  @SuppressWarnings("unchecked")
+  private static <T> T readInternal(
+      final DataInput in, final T outObject, final boolean into) {
+    return KRYO_POOL.run(new KryoCallback<T>() {
+      @Override
+      public T execute(Kryo kryo) {
+        HadoopKryo hkryo = (HadoopKryo) kryo;
+        hkryo.setDataInput(in);
+
+        T object;
+        if (into) {
+          hkryo.readIntoObject(hkryo.input, outObject);
+          object = outObject;
+        } else {
+          object = (T) hkryo.readClassAndObject(hkryo.input);
+        }
+        hkryo.input.nextChunks();
+
+        hkryo.input.close();
+        return object;
+      }
+    });
+  }
+
+  /**
+   * Reads an object, from input, into a given object,
+   * allowing object reuse.
+   *
+   * @param input Input stream
+   * @param object Object to fill from input
+   */
+  private void readIntoObject(Input input, Object object) {
+    Preconditions.checkNotNull(object);
+
+    Class<?> type = object.getClass();
+    ReusableFieldSerializer<Object> serializer =
+        getOrCreateReusableSerializer(type);
+
+    serializer.setReadIntoObject(object);
+    Object result = readObject(input, type, serializer);
+
+    Preconditions.checkState(result == object);
+  }
+
+  /**
+   * Write an object to output, in a way that can be read
+   * using readIntoObject.
+   * @param output Output stream
+   * @param object Object to be written
+   */
+  private void writeOutOfObject(Output output, Object object) {
+    ReusableFieldSerializer<Object> serializer =
+        getOrCreateReusableSerializer(object.getClass());
+    writeObject(output, object, serializer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java
new file mode 100644
index 0000000..1e03888
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
+
+/**
+ * Class which you can extend to get all serialization/deserialization
+ * done automagically
+ */
+public abstract class KryoWritable implements KryoIgnoreWritable {
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    HadoopKryo.writeOutOfObject(out, this);
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    HadoopKryo.readIntoObject(in, this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
new file mode 100644
index 0000000..0f6e73f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Generic wrapper object, making any object writable.
+ *
+ * Uses Kryo inside for serialization.
+ * Current configuration is not optimized for performance,
+ * but Writable interface doesn't allow much room for it.
+ *
+ * Note - Java8 lambdas need to implement Serializable to work.
+ *
+ * @param <T> Object type
+ */
+public class KryoWritableWrapper<T> implements Writable {
+  /** Wrapped object */
+  private T object;
+
+  /**
+   * Create wrapper given an object.
+   * @param object Object instance
+   */
+  public KryoWritableWrapper(T object) {
+    this.object = object;
+  }
+
+  /**
+   * Creates wrapper initialized with null.
+   */
+  public KryoWritableWrapper() {
+  }
+
+  /**
+   * Unwrap the object value
+   * @return Object value
+   */
+  public T get() {
+    return object;
+  }
+
+  /**
+   * Set wrapped object value
+   * @param object New object value
+   */
+  public void set(T object) {
+    this.object = object;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws java.io.IOException {
+    object = HadoopKryo.readClassAndObject(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    HadoopKryo.writeClassAndObject(out, object);
+  }
+
+  /**
+   * Returns Writable instance, wrapping given object only
+   * if it is not already writable.
+   *
+   * @param object Object to potentially wrap
+   * @return Writable object holding argument
+   */
+  public static Writable wrapIfNeeded(Object object) {
+    if (object instanceof Writable) {
+      return (Writable) object;
+    } else {
+      return new KryoWritableWrapper<>(object);
+    }
+  }
+
+  /**
+   * Unwrap Writable object if it was wrapped initially,
+   * inverse of wrapIfNeeded function.
+   * @param value Potentially wrapped value
+   * @return Original unwrapped value
+   * @param <T> Type of returned object.
+   */
+  public static <T> T unwrapIfNeeded(Writable value) {
+    if (value instanceof KryoWritableWrapper) {
+      return ((KryoWritableWrapper<T>) value).get();
+    } else {
+      return (T) value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java
new file mode 100644
index 0000000..cd26be8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.util.Random;
+
+/**
+ * Transient Random class. Seed/state is not kept after
+ * serializing/deserializing.
+ *
+ * Within Blocks Framework - if we initialize Random within the Piece, when
+ * it's serialzied and copied to all workers and all threads - keeping seed
+ * would cause same series of random numbers to be generated everywhere.
+ *
+ * So this class is safe to be used in Pieces, while using regular Random
+ * class is forbidden to be serialized.
+ * Best approach would be to not have Random serialized, and create it on
+ * workers, where possible.
+ */
+public class TransientRandom {
+  /** Instance of random object */
+  private final transient Random random = new Random();
+
+  /**
+   * Get instance of Random
+   * @return Random instance
+   */
+  public Random get() {
+    return random;
+  }
+
+  /**
+   * Returns a pseudorandom, uniformly distributed {@code int} value
+   * between 0 (inclusive) and the specified value (exclusive), drawn from
+   * this random number generator's sequence.
+   *
+   * @param n Given upper limit
+   * @return pseudorandom integer number in [0, n) range.
+   */
+  public int nextInt(int n) {
+    return random.nextInt(n);
+  }
+
+  /**
+   * Returns the next pseudorandom, uniformly distributed
+   * {@code double} value between {@code 0.0} and
+   * {@code 1.0} from this random number generator's sequence.
+   *
+   * @return pseudorandom number in [0, 1)
+   */
+  public double nextDouble() {
+    return random.nextDouble();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java
new file mode 100644
index 0000000..58dd78d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.markers;
+
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * Marker interface, specifying that kryo should serialize it on it's own,
+ * and ignore actual Writable method implementations.
+ *
+ * If you are using HadoopKryo.writeOutOfObject/readIntoObject result is the
+ * same, and adding it allows wrapping Kryo context into writable context,
+ * and then wrapping it back into Kryo context.
+ */
+public interface KryoIgnoreWritable extends Writable {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java
new file mode 100644
index 0000000..66840c5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.markers;
+
+
+/**
+ * Marker interface saying that class should never be serialized,
+ * that it is code error for it to be tried to be serialized.
+ */
+public interface NonKryoWritable {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java
new file mode 100644
index 0000000..5fcfa72
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Marker interfaces used by HadoopKryo to special-case serialization.
+ */
+package org.apache.giraph.writable.kryo.markers;

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java
new file mode 100644
index 0000000..a04bb65
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities for using Kryo to efficiently serialize objects,
+ * and integrate with Writable interface.
+ * HadoopKryo is a main class doing serialization.
+ */
+package org.apache.giraph.writable.kryo;

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java
new file mode 100644
index 0000000..3d66eb7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Special serializer for Arrays.asList() as they can not be
+ * deserialized in a standard way.
+ * {@see
+ * https://groups.google.com/forum/#!msg/kryo-users/2lXTCEOSxA0/gLzIZRtaNCUJ}
+ */
+public class ArraysAsListSerializer extends Serializer<List> {
+  @Override
+  public void write(Kryo kryo, Output output, List object) {
+    kryo.writeObject(output, object.toArray(new Object[object.size()]));
+  }
+
+  @Override
+  public List read(Kryo kryo, Input input, Class<List> type) {
+    return Arrays.asList(kryo.readObject(input, Object[].class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java
new file mode 100644
index 0000000..5460692
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Special serializer for Collections.nCopies
+ *
+ * @param <T> Element type
+ */
+public class CollectionsNCopiesSerializer<T> extends Serializer<List<T>> {
+  @Override
+  public void write(Kryo kryo, Output output, List<T> object) {
+    output.writeInt(object.size(), true);
+    if (object.size() > 0) {
+      kryo.writeClassAndObject(output, object.get(0));
+    }
+  }
+
+  @Override
+  public List<T> read(Kryo kryo, Input input, Class<List<T>> type) {
+    int size = input.readInt(true);
+    if (size > 0) {
+      T object = (T) kryo.readClassAndObject(input);
+      return Collections.nCopies(size, object);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java
new file mode 100644
index 0000000..1967e6a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * A custom Serializer that will call the Writable methods defined by the
+ * object itself to serialize the object, instead of Kryo auto-magically
+ * serializing
+ *
+ * @param <T> Object type, should implement Writable
+ */
+
+public class DirectWritableSerializer<T extends Writable>
+    extends Serializer<T> {
+
+  @Override
+  public void write(Kryo kryo, Output output, T object) {
+    try {
+      object.write(new DataOutputStream(output));
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "DirectWritableSerializer.write calling Writable method of class: " +
+            object.getClass().getName() + " encountered issues", e);
+    }
+  }
+
+  @Override
+  public T read(Kryo kryo, Input input, Class<T> type) {
+    try {
+      T object = create(kryo, input, type);
+      kryo.reference(object);
+      object.readFields(new DataInputStream(input));
+
+      return object;
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "DirectWritableSerializer.read calling Writable method of class: " +
+          type.getName() + " encountered issues", e);
+    }
+  }
+
+  /**
+   * Used by {@link #read(Kryo, Input, Class)} to create the new object.
+   * This can be overridden to customize object creation, eg to call a
+   * constructor with arguments. The default implementation
+   * uses {@link Kryo#newInstance(Class)}.
+   *
+   * @param kryo Kryo object instance
+   * @param input Input
+   * @param type Type of the class to create
+   * @return New instance of wanted type
+   */
+  protected T create(Kryo kryo, Input input, Class<T> type) {
+    return ReflectionUtils.newInstance(type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java
new file mode 100644
index 0000000..0eb9676
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+
+/**
+ * Kryo Serializer for Fastutil collection class.
+ * By default, because they extend boxed collections, are being serialized very
+ * inefficiently through a lot of temporary object creation.
+ *
+ * We are relying that fastutil classes are written to be correctly serialized
+ * with Java serialization, and have put transient on all array fields and are
+ * doing custom efficient serialization in writeObject/readObject methods.
+ * This Serializer then swaps ObjectOutputStream for all default fields with
+ * FieldSerializer, and then calls appropriate writeObject/readObject methods.
+ * We are also relying on defaultWriteObject/defaultReadObject being
+ * effectively called first within those methods
+ *
+ * @param <T> Object type
+ */
+public class FastUtilSerializer<T> extends Serializer<T> {
+  /** List of all types generated by fastutil */
+  private static final String[] PRIMITIVE_TYPES = new String[] {
+    "Boolean", "Byte", "Short", "Int", "Long", "Float", "Double", "Char",
+    "Object"};
+  /** List of all types used as keys in fastutil */
+  private static final String[] PRIMITIVE_KEY_TYPES = new String[] {
+    "Byte", "Short", "Int", "Long", "Float", "Double", "Char", "Object"};
+
+  /** Field serializer for this fastutil class */
+  private final FieldSerializer<T> fieldSerializer;
+
+  /** Handle to writeObject Method on this fastutil class*/
+  private final Method writeMethod;
+  /** Handle to readObject Method on this fastutil class*/
+  private final Method readMethod;
+  /** Reusable output stream wrapper */
+  private final FastUtilSerializer.FastutilKryoObjectOutputStream outputWrapper;
+  /** Reusable input stream wrapper */
+  private final FastUtilSerializer.FastutilKryoObjectInputStream inputWrapper;
+
+  /**
+   * Creates and initializes new serializer for a given fastutil class.
+   * @param kryo Kryo instance
+   * @param type Fastutil class
+   */
+  public FastUtilSerializer(Kryo kryo, Class<T> type) {
+    fieldSerializer = new FieldSerializer<>(kryo, type);
+    fieldSerializer.setIgnoreSyntheticFields(false);
+
+    try {
+      writeMethod = type.getDeclaredMethod(
+          "writeObject", ObjectOutputStream.class);
+      writeMethod.setAccessible(true);
+      readMethod = type.getDeclaredMethod(
+          "readObject", ObjectInputStream.class);
+      readMethod.setAccessible(true);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException(
+          "Fastutil class " + type +
+          " doesn't have readObject/writeObject methods", e);
+    }
+
+    try {
+      outputWrapper = new FastutilKryoObjectOutputStream();
+      inputWrapper = new FastutilKryoObjectInputStream();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Register serializer for a given fastutil class.
+   * @param kryo Kryo instance
+   * @param fastutilClass Fastutil class
+   */
+  public static void register(Kryo kryo, Class<?> fastutilClass) {
+    kryo.register(fastutilClass, new FastUtilSerializer<>(kryo, fastutilClass));
+  }
+
+  /**
+   * Registers serializers for all possible fastutil classes.
+   *
+   * There are many fastutil classes, so it is recommended to call this
+   * function at the end, so they fastutil classes don't use up small
+   * registration numbers.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerAll(Kryo kryo) {
+    registerArrayLists(kryo);
+    registerArrayBigList(kryo);
+    registerOpenHashSets(kryo);
+    registerArraySets(kryo);
+    registerRBTreeSets(kryo);
+    registerAVLTreeSets(kryo);
+    registerOpenHashMaps(kryo);
+    registerRBTreeMaps(kryo);
+    registerAVLTreeMaps(kryo);
+
+    // Note - HeapPriorityQueues don't extend boxed collection,
+    // and so they work out of the box correctly
+  }
+
+  /**
+   * Register all Fastutil ArrayLists.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerArrayLists(Kryo kryo) {
+    registerAll(kryo, singleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_ArrayList", PRIMITIVE_TYPES));
+  }
+
+  /**
+   * Register all Fastutil ArrayBigLists.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerArrayBigList(Kryo kryo) {
+    registerAll(kryo, singleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_BigArrayBigList", PRIMITIVE_TYPES));
+  }
+
+  /**
+   * Register all Fastutil OpenHashSets.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerOpenHashSets(Kryo kryo) {
+    registerAll(kryo, singleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_OpenHashSet", PRIMITIVE_TYPES));
+  }
+
+  /**
+   * Register all Fastutil ArraySets.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerArraySets(Kryo kryo) {
+    registerAll(kryo, singleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_ArraySet", PRIMITIVE_TYPES));
+  }
+
+  /**
+   * Register all Fastutil RBTreeSets.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerRBTreeSets(Kryo kryo) {
+    registerAll(kryo, singleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_RBTreeSet", PRIMITIVE_KEY_TYPES));
+  }
+
+  /**
+   * Register all Fastutil AVLTreeSets.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerAVLTreeSets(Kryo kryo) {
+    registerAll(kryo, singleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_AVLTreeSet", PRIMITIVE_KEY_TYPES));
+  }
+
+  /**
+   * Register all Fastutil OpenHashMaps.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerOpenHashMaps(Kryo kryo) {
+    registerAll(kryo, doubleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_2_T2_OpenHashMap",
+        PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
+  }
+
+  /**
+   * Register all Fastutil RBTreeMaps.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerRBTreeMaps(Kryo kryo) {
+    registerAll(kryo, doubleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_2_T2_RBTreeMap",
+        PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
+  }
+
+  /**
+   * Register all Fastutil AVLTreeMaps.
+   *
+   * @param kryo Kryo instance
+   */
+  public static void registerAVLTreeMaps(Kryo kryo) {
+    registerAll(kryo, doubleTypes(
+        "it.unimi.dsi.fastutil._t1_s._T1_2_T2_AVLTreeMap",
+        PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
+  }
+
+  /**
+   * Register all class from the list of classes.
+   *
+   * @param kryo Kryo instance
+   * @param types List of classes
+   */
+  private static void registerAll(Kryo kryo, ArrayList<Class<?>> types) {
+    for (Class<?> type : types) {
+      register(kryo, type);
+    }
+  }
+
+  /**
+   * Returns list of all classes that are generated by using given
+   * pattern, and replacing it with passed list of types.
+   * Pattern contains _t1_ and _T1_, for lowercase and actual name.
+   *
+   * @param pattern Given pattern
+   * @param types Given list of strings to replace into pattern
+   * @return List of all classes
+   */
+  private static ArrayList<Class<?>> singleTypes(
+      String pattern, String[] types) {
+    ArrayList<Class<?>> result = new ArrayList<>();
+
+    for (String type : types) {
+      try {
+        result.add(Class.forName(
+            pattern.replaceAll("_T1_", type).replaceAll(
+                "_t1_", type.toLowerCase())));
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException(pattern + " " + type, e);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Returns list of all classes that are generated by using given
+   * pattern, and replacing it with passed list of types.
+   * Pattern contains two variable pairs: _t1_, _T1_ and _t2_, _T2_,
+   * in each pair one for lowercase and one for actual name.
+   *
+   * @param pattern Given pattern
+   * @param types1 Given list of strings to replace t1 into pattern
+   * @param types2 Given list of strings to replace t2 into pattern
+   * @return List of all classes
+   */
+  private static ArrayList<Class<?>> doubleTypes(
+      String pattern, String[] types1, String[] types2) {
+    ArrayList<Class<?>> result = new ArrayList<>();
+
+    for (String type1 : types1) {
+      for (String type2 : types2) {
+        try {
+          result.add(Class.forName(
+              pattern.replaceAll("_T1_", type1).replaceAll(
+                  "_t1_", type1.toLowerCase())
+                .replaceAll("_T2_", type2).replaceAll(
+                    "_t2_", type2.toLowerCase())));
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException(pattern + " " + type1 + " " + type2, e);
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output, T object) {
+    fieldSerializer.write(kryo, output, object);
+
+    outputWrapper.set(output, kryo);
+    try {
+      writeMethod.invoke(object, outputWrapper);
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      throw new RuntimeException("writeObject failed", e);
+    }
+  }
+
+  @Override
+  public T read(Kryo kryo, Input input, Class<T> type) {
+    T result = fieldSerializer.read(kryo, input, type);
+
+    if (result != null) {
+      inputWrapper.set(input, kryo);
+      try {
+        readMethod.invoke(result, inputWrapper);
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException("readObject failed", e);
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Wrapper around ObjectOutputStream that ignores defaultWriteObject (assumes
+   * that needed logic was already executed before), and passes all other calls
+   * to Output
+   */
+  private static class FastutilKryoObjectOutputStream
+      extends ObjectOutputStream {
+    /** Output */
+    private Output output;
+    /** Kryo */
+    private Kryo kryo;
+
+    /** Constructor */
+    FastutilKryoObjectOutputStream() throws IOException {
+      super();
+    }
+
+    /**
+     * Setter
+     *
+     * @param output Output
+     * @param kryo kryo
+     */
+    public void set(Output output, Kryo kryo) {
+      this.output = output;
+      this.kryo = kryo;
+    }
+
+    @Override
+    public void defaultWriteObject() throws IOException {
+    }
+
+    @Override
+    public void writeBoolean(boolean val) throws IOException {
+      output.writeBoolean(val);
+    }
+
+    @Override
+    public void writeByte(int val) throws IOException  {
+      output.writeByte(val);
+    }
+
+    @Override
+    public void writeShort(int val)  throws IOException {
+      output.writeShort(val);
+    }
+
+    @Override
+    public void writeChar(int val)  throws IOException {
+      output.writeChar((char) val);
+    }
+
+    @Override
+    public void writeInt(int val)  throws IOException {
+      output.writeInt(val, false);
+    }
+
+    @Override
+    public void writeLong(long val)  throws IOException {
+      output.writeLong(val, false);
+    }
+
+    @Override
+    public void writeFloat(float val) throws IOException {
+      output.writeFloat(val);
+    }
+
+    @Override
+    public void writeDouble(double val) throws IOException {
+      output.writeDouble(val);
+    }
+
+    @Override
+    protected void writeObjectOverride(Object obj) throws IOException {
+      kryo.writeClassAndObject(output, obj);
+    }
+  }
+
+  /**
+   * Wrapper around ObjectOutputStream that ignores defaultReadObject
+   * (assumes that needed logic was already executed before), and passes
+   * all other calls to Output
+   */
+  private static class FastutilKryoObjectInputStream extends ObjectInputStream {
+    /** Input */
+    private Input input;
+    /** Kryo */
+    private Kryo kryo;
+
+    /** Constructor */
+    FastutilKryoObjectInputStream() throws IOException {
+      super();
+    }
+
+    /**
+     * Setter
+     *
+     * @param input Input
+     * @param kryo Kryo
+     */
+    public void set(Input input, Kryo kryo) {
+      this.input = input;
+      this.kryo = kryo;
+    }
+
+    @Override
+    public void defaultReadObject() throws IOException, ClassNotFoundException {
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException {
+      return input.readBoolean();
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+      return input.readByte();
+    }
+
+    @Override
+    public char readChar() throws IOException {
+      return input.readChar();
+    }
+
+    @Override
+    public short readShort() throws IOException {
+      return input.readShort();
+    }
+
+    @Override
+    public int readInt() throws IOException {
+      return input.readInt(false);
+    }
+
+    @Override
+    public long readLong() throws IOException {
+      return input.readLong(false);
+    }
+
+    @Override
+    public float readFloat() throws IOException {
+      return input.readFloat();
+    }
+
+    @Override
+    public double readDouble() throws IOException {
+      return input.readDouble();
+    }
+
+    @Override
+    protected Object readObjectOverride()
+        throws IOException, ClassNotFoundException {
+      return kryo.readClassAndObject(input);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java
new file mode 100644
index 0000000..f11139d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+/**
+ * Serializer used to deserialize data into object, instead of creating a new
+ * value readIntoObject needs to be set right before deserialization is called
+ * on it.
+ *
+ * @param <T> Object type
+ */
+public class ReusableFieldSerializer<T> extends FieldSerializer<T> {
+  /** Current object into which to deserialize */
+  private T readIntoObject;
+
+  /**
+   * Creates new reusable field serializer for a given type.
+   * @param kryo HadoopKryo object
+   * @param type Type of object
+   */
+  public ReusableFieldSerializer(Kryo kryo, Class type) {
+    super(kryo, type);
+  }
+
+  public void setReadIntoObject(T value) {
+    this.readIntoObject = value;
+  }
+
+  @Override
+  protected T create(Kryo kryo, Input input, Class<T> type) {
+    Preconditions.checkNotNull(readIntoObject);
+    Preconditions.checkState(readIntoObject.getClass().equals(type));
+    T toReturn = readIntoObject;
+    readIntoObject = null;
+    return toReturn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java
new file mode 100644
index 0000000..9eabf96
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementations of custom serializers needed for HadoopKryo
+ */
+package org.apache.giraph.writable.kryo.serializers;

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java
new file mode 100644
index 0000000..3898b82
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import static org.junit.Assert.assertEquals;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.giraph.types.ops.collections.BasicArrayList.BasicLongArrayList;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+/**
+ * Tests some subtle cases of kryo serialization.
+ */
+public class KryoWritableTest {
+  public static class TestClassA extends KryoWritable {
+    final String testObject;
+    final List list;
+    final int something;
+
+    public TestClassA(String testObject, List list, int something) {
+      this.testObject = testObject;
+      this.list = list;
+      this.something = something;
+    }
+
+    public TestClassA() {
+      this.testObject = null;
+      this.list = null;
+      this.something = -1;
+    }
+  }
+
+  @Test
+  public void testTestClassA() throws Exception {
+    String testObject = "Hello World!";
+    TestClassA res = new TestClassA();
+    WritableUtils.copyInto(
+        new TestClassA(testObject, Arrays.asList(1, 2, 3), 5), res, true);
+
+    assertEquals(testObject, res.testObject);
+
+    assertEquals(3, res.list.size());
+    assertEquals(1, res.list.get(0));
+    assertEquals(2, res.list.get(1));
+    assertEquals(3, res.list.get(2));
+
+    assertEquals(5, res.something);
+  }
+
+  public static class LongKryoWritable extends KryoWritable {
+    private long value;
+
+    public LongKryoWritable(long value) {
+      this.value = value;
+    }
+
+    public long get() {
+      return value;
+    }
+
+    public void set(long value) {
+      this.value = value;
+    }
+  }
+
+
+  int multiplier = 5000; // use 5000 for profiling
+  int longTestTimes = 1000 * multiplier;
+
+  @Test
+  public void testLongKryoWritable() throws Exception {
+    LongKryoWritable from = new LongKryoWritable(0);
+    LongKryoWritable to = new LongKryoWritable(0);
+
+    for (int i = 0; i < longTestTimes; i++) {
+      from.set(i);
+      WritableUtils.copyInto(from, to, true);
+      assertEquals(i, to.get());
+    }
+  }
+
+  @Test
+  public void testLongWritable() throws Exception {
+    LongWritable from = new LongWritable(0);
+    LongWritable to = new LongWritable(0);
+
+    for (int i = 0; i < longTestTimes; i++) {
+      from.set(i);
+      WritableUtils.copyInto(from, to, true);
+      assertEquals(i, to.get());
+    }
+  }
+
+  public static class LongListKryoWritable extends KryoWritable {
+    public LongArrayList value;
+
+    public LongListKryoWritable(LongArrayList value) {
+      this.value = value;
+    }
+  }
+
+  int longListTestTimes = 1 * multiplier;
+  int longListTestSize = 100000;
+
+  @Test
+  public void testLongListKryoWritable() throws Exception {
+    LongArrayList list = new LongArrayList(longListTestSize);
+    for (int i = 0; i < longListTestSize; i++) {
+      list.add(i);
+    }
+
+    LongListKryoWritable from = new LongListKryoWritable(list);
+    LongListKryoWritable to = new LongListKryoWritable(null);
+
+    for (int i = 0; i < longListTestTimes; i++) {
+      from.value.set((2 * i) % longListTestSize, 0);
+      WritableUtils.copyInto(from, to, true);
+    }
+  }
+
+  @Test
+  public void testLongListWritable() throws Exception {
+    BasicLongArrayList from = new BasicLongArrayList(longListTestSize);
+    LongWritable value = new LongWritable();
+    for (int i = 0; i < longListTestSize; i++) {
+      value.set(i);
+      from.add(value);
+    }
+
+    BasicLongArrayList to = new BasicLongArrayList(longListTestSize);
+    value.set(0);
+
+    for (int i = 0; i < longListTestTimes; i++) {
+      from.set((2 * i) % longListTestSize, value);
+      WritableUtils.copyInto(from, to, true);
+    }
+  }
+
+  public static class NestedKryoWritable<T> extends KryoWritable {
+    public LongKryoWritable value1;
+    public T value2;
+
+    public NestedKryoWritable(LongKryoWritable value1, T value2) {
+      this.value1 = value1;
+      this.value2 = value2;
+    }
+  }
+
+  @Test
+  public void testNestedKryoWritable() throws Exception {
+    LongKryoWritable inner = new LongKryoWritable(5);
+    NestedKryoWritable<LongKryoWritable> res = new NestedKryoWritable<>(null, null);
+    WritableUtils.copyInto(
+        new NestedKryoWritable<>(inner, inner), res, true);
+
+    assertEquals(5, res.value1.get());
+    Assert.assertTrue(res.value1 == res.value2);
+  }
+
+  @Test
+  public void testRecursiveKryoWritable() throws Exception {
+    LongKryoWritable inner = new LongKryoWritable(5);
+    NestedKryoWritable wanted = new NestedKryoWritable<>(inner, null);
+    wanted.value2 = wanted;
+
+    NestedKryoWritable res = new NestedKryoWritable<>(null, null);
+    WritableUtils.copyInto(wanted, res, true);
+
+    assertEquals(5, res.value1.get());
+    Assert.assertTrue(res == res.value2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java
new file mode 100644
index 0000000..2291f16
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import it.unimi.dsi.fastutil.chars.Char2ObjectMap;
+import it.unimi.dsi.fastutil.chars.Char2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.Int2BooleanMap;
+import it.unimi.dsi.fastutil.ints.Int2BooleanOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+
+
+
+/**
+ * Tests some subtle cases of kryo serialization.
+ */
+public class KryoWritableWrapperTest {
+  public static <T> T kryoSerDeser(T t) throws IOException {
+    KryoWritableWrapper<T> wrapped = new KryoWritableWrapper<>(t);
+    KryoWritableWrapper<T> deser = new KryoWritableWrapper<>();
+    WritableUtils.copyInto(wrapped, deser, true);
+    return deser.get();
+  }
+
+  @Test
+  public void testArraysAsList() throws IOException {
+    List res = kryoSerDeser(Arrays.asList(1, 2, 3));
+
+    assertEquals(3, res.size());
+    assertEquals(1, res.get(0));
+    assertEquals(2, res.get(1));
+    assertEquals(3, res.get(2));
+  }
+
+
+  @Test
+  public void testArraysAsListMultiRef() throws IOException {
+    List list = Arrays.asList(1, 2, 3);
+    Object obj = new Object();
+    List wanted = Arrays.asList(list, list, obj, obj, null);
+    wanted.set(4, wanted);
+    List res = kryoSerDeser(wanted);
+
+    assertTrue(res.get(0) == res.get(1));
+    assertTrue(res.get(2) == res.get(3));
+    // TODO see if this can be supported, though this is a rare case:
+    // assertTrue(res == res.get(4));
+  }
+
+  @Test
+  public void testCollectionsNCopiesList() throws IOException {
+    List res = kryoSerDeser(Collections.nCopies(3, 42));
+
+    assertEquals(3, res.size());
+    assertEquals(42, res.get(0));
+    assertEquals(42, res.get(1));
+    assertEquals(42, res.get(2));
+  }
+
+  @Test
+  public void testCollectionsNCopiesObjectList() throws IOException {
+    String testObject = "Hello World!";
+    List<String> res = kryoSerDeser(Collections.nCopies(3, testObject));
+
+    assertEquals(3, res.size());
+    assertEquals(testObject, res.get(0));
+    assertEquals(testObject, res.get(1));
+    assertEquals(testObject, res.get(2));
+  }
+
+  @Test
+  public void testUnmodifiableIterator() throws IOException {
+    Iterator<Integer> in = Iterables.concat(
+        Arrays.asList(0, 1),
+        Arrays.asList(2, 3),
+        Arrays.asList(4, 5)).iterator();
+
+    in.next();
+    in.next();
+    in.next();
+    Iterator res = kryoSerDeser(in);
+
+    int cnt = 3;
+    for(; res.hasNext(); cnt++) {
+      assertEquals(cnt, res.next());
+    }
+    assertEquals(6, cnt);
+  }
+
+  @Test
+  public void testIteratorsConcat() throws IOException {
+    Iterator<Integer> in = Iterators.concat(
+        Arrays.asList(0, 1).iterator(),
+        Arrays.asList(2, 3).iterator(),
+        Arrays.asList(4, 5).iterator());
+
+    in.next();
+    in.next();
+    in.next();
+
+    Iterator res = kryoSerDeser(in);
+
+    int cnt = 3;
+    for(; res.hasNext(); cnt++) {
+      assertEquals(cnt, res.next());
+    }
+    assertEquals(6, cnt);
+
+  }
+
+  @Test
+  public void testImmutableList() throws IOException {
+    {
+      List res = kryoSerDeser(ImmutableList.of(1, 2));
+      assertEquals(2, res.size());
+      assertEquals(1, res.get(0));
+      assertEquals(2, res.get(1));
+    }
+
+    {
+      List list = ImmutableList.of(1, 2, 3);
+      Object obj = new Object();
+      List wanted = ImmutableList.of(list, list, obj, obj);
+      List res = kryoSerDeser(wanted);
+
+      assertTrue(res.get(0) == res.get(1));
+      assertTrue(res.get(2) == res.get(3));
+    }
+  }
+
+  @Test
+  public void testFastutilSet() throws ClassNotFoundException, IOException {
+    LongOpenHashSet set = new LongOpenHashSet();
+    set.add(6);
+    LongOpenHashSet deser = kryoSerDeser(set);
+    deser.add(5);
+    set.add(5);
+    Assert.assertEquals(set, deser);
+  }
+
+  @Test
+  public void testFastutilLongList() throws ClassNotFoundException, IOException {
+    LongArrayList list = new LongArrayList();
+    list.add(6);
+    LongArrayList deser = kryoSerDeser(list);
+    deser.add(5);
+    list.add(5);
+    Assert.assertEquals(list, deser);
+  }
+
+  @Test
+  public void testFastutilFloatList() throws ClassNotFoundException, IOException {
+    FloatArrayList list = new FloatArrayList();
+    list.add(6L);
+    FloatArrayList deser = kryoSerDeser(list);
+    deser.add(5L);
+    list.add(5L);
+    Assert.assertEquals(list, deser);
+  }
+
+  @Test
+  public void testFastutilMap() throws ClassNotFoundException, IOException {
+    Int2BooleanMap list = new Int2BooleanOpenHashMap();
+    list.put(6, true);
+    Int2BooleanMap deser = kryoSerDeser(list);
+    deser.put(5, false);
+    list.put(5, false);
+    Assert.assertEquals(list, deser);
+  }
+
+  @Test
+  public void testFastutil2ObjMap() throws ClassNotFoundException, IOException {
+    Char2ObjectMap<IntWritable> list = new Char2ObjectOpenHashMap<>();
+    list.put('a', new IntWritable(6));
+    list.put('q', new IntWritable(7));
+    list.put('w', new IntWritable(8));
+    list.put('e', new IntWritable(9));
+    list.put('r', new IntWritable(7));
+    list.put('c', null);
+    Char2ObjectMap<IntWritable> deser = kryoSerDeser(list);
+    deser.put('b', null);
+    list.put('b', null);
+
+    Assert.assertEquals(list, deser);
+  }
+
+  @Test
+  @Ignore("Long test used for profiling compiling speed")
+  public void testLongFastutilListProfile() throws ClassNotFoundException, IOException {
+    int n = 100;
+    int rounds = 2000000;
+    LongArrayList list = new LongArrayList(n);
+    for (int i = 0; i < n; i++) {
+      list.add(i);
+    }
+
+    for (int round = 0; round < rounds; round ++) {
+      LongArrayList deser = kryoSerDeser(list);
+      deser.add(round);
+      list.add(round);
+      Assert.assertEquals(list.size(), deser.size());
+      Assert.assertArrayEquals(list.elements(), deser.elements());
+
+      list.popLong();
+    }
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testRandom() throws ClassNotFoundException, IOException {
+    kryoSerDeser(new Random()).nextBoolean();
+  }
+
+  private static class TestConf implements GiraphConfigurationSettable {
+    @Override
+    public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    }
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testConfiguration() throws ClassNotFoundException, IOException {
+    kryoSerDeser(new Configuration());
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testConfigurable() throws ClassNotFoundException, IOException {
+    kryoSerDeser(new TestConf());
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testVertexReceiver() throws ClassNotFoundException, IOException {
+    kryoSerDeser(new NonKryoWritable() {
+    });
+  }
+
+
+  @Test
+  public void testBlacklistedClasses() throws ClassNotFoundException, IOException {
+    Assert.assertEquals(kryoSerDeser(Random.class), Random.class);
+    Assert.assertEquals(kryoSerDeser(TestConf.class), TestConf.class);
+    Assert.assertEquals(kryoSerDeser(GiraphConfiguration.class), GiraphConfiguration.class);
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testRecursive() throws ClassNotFoundException, IOException {
+    kryoSerDeser(new KryoWritableWrapper<>(new Object())).get().hashCode();
+  }
+
+  @Test
+  public void testNull() throws ClassNotFoundException, IOException {
+    Assert.assertNull(kryoSerDeser(null));
+  }
+}


Mime
View raw message