tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [06/10] incubator-tinkerpop git commit: Kryo shim configuration tweaks
Date Mon, 06 Jun 2016 20:53:45 GMT
Kryo shim configuration tweaks


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9321a3e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9321a3e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9321a3e1

Branch: refs/heads/TINKERPOP-1321
Commit: 9321a3e14eab4ed05f8ef5f4c77de481a4011b81
Parents: 218d790
Author: Dan LaRocque <dalaro@hopcount.org>
Authored: Mon Jun 6 02:24:12 2016 -0400
Committer: Dan LaRocque <dalaro@hopcount.org>
Committed: Mon Jun 6 03:10:03 2016 -0400

----------------------------------------------------------------------
 .../process/computer/GiraphWorkerContext.java   |   3 +-
 .../gremlin/structure/io/gryo/GryoMapper.java   |  30 ++-
 .../gremlin/structure/io/gryo/GryoPool.java     |   1 +
 .../structure/io/gryo/GryoSerializers.java      |  40 ++--
 .../structure/io/gryo/JavaTimeSerializers.java  | 127 +++++-------
 .../structure/io/gryo/PairSerializer.java       |  11 +-
 .../structure/io/gryo/TypeRegistration.java     |  12 ++
 .../io/gryo/kryoshim/KryoShimService.java       |  16 ++
 .../io/gryo/kryoshim/KryoShimServiceLoader.java |  23 ++-
 .../io/gryo/kryoshim/SerializerShim.java        |   2 +-
 .../hadoop/process/computer/HadoopCombine.java  |   3 +-
 .../hadoop/process/computer/HadoopMap.java      |   3 +-
 .../hadoop/process/computer/HadoopReduce.java   |   3 +-
 .../structure/io/HadoopPoolShimService.java     |   7 +
 .../structure/io/HadoopPoolsConfigurable.java   |   4 +-
 .../structure/io/gryo/GryoRecordReader.java     |   3 +-
 .../structure/io/gryo/GryoRecordWriter.java     |   4 +-
 .../spark/process/computer/SparkExecutor.java   |   3 +-
 .../structure/io/TinkerPopKryoRegistrator.java  | 121 ------------
 .../spark/structure/io/gryo/GryoSerializer.java |   2 +-
 .../io/gryo/IoRegistryAwareKryoSerializer.java  | 116 +++++++++++
 .../io/gryo/TinkerPopKryoRegistrator.java       | 194 +++++++++++++++++++
 .../unshaded/UnshadedKryoShimService.java       | 131 ++++++++-----
 ...n.structure.io.gryo.kryoshim.KryoShimService |   1 +
 .../spark/structure/io/ToyGraphInputRDD.java    |   3 +-
 25 files changed, 572 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
index 86b733c..0122ab4 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.util.Iterator;
 
@@ -45,7 +46,7 @@ public final class GiraphWorkerContext extends WorkerContext {
 
     public void preApplication() throws InstantiationException, IllegalAccessException {
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration());
-        HadoopPools.initialize(apacheConfiguration);
+        KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
         final VertexProgram vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
         this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1));
         this.memory = new GiraphMemory(this, vertexProgram);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 851b03c..41ca44d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
+import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
@@ -369,6 +370,20 @@ public final class GryoMapper implements Mapper<Kryo> {
         private Supplier<ClassResolver> classResolver = GryoClassResolver::new;
 
         private Builder() {
+            // Validate the default registrations
+            // For justification of these default registration rules, see TinkerPopKryoRegistrator
+            for (TypeRegistration<?> tr : typeRegistrations) {
+                if (tr.hasSerializer() /* no serializer is acceptable */ &&
+                    null == tr.getSerializerShim() /* a shim serializer is acceptable */ &&
+                    !(tr.getShadedSerializer() instanceof JavaSerializer) /* shaded JavaSerializer is acceptable */) {
+                    // everything else is invalid
+                    String msg = String.format("The default GryoMapper type registration %s is invalid.  " +
+                            "It must supply either an implementation of %s or %s, but supplies neither.  " +
+                            "This is probably a bug in GryoMapper's default serialization class registrations.", tr,
+                            SerializerShim.class.getCanonicalName(), JavaSerializer.class.getCanonicalName());
+                    throw new IllegalStateException(msg);
+                }
+            }
         }
 
         /**
@@ -538,8 +553,8 @@ public final class GryoMapper implements Mapper<Kryo> {
             if (1 < serializerCount) {
                 String msg = String.format(
                         "GryoTypeReg accepts at most one kind of serializer, but multiple " +
-                                "serializers were supplied for class %s (id %s).  " +
-                                "Shaded serializer: %s.  Shim serializer: %s.  Shaded serializer function: %s.",
+                        "serializers were supplied for class %s (id %s).  " +
+                        "Shaded serializer: %s.  Shim serializer: %s.  Shaded serializer function: %s.",
                         this.clazz.getCanonicalName(), id,
                         this.shadedSerializer, this.serializerShim, this.functionOfShadedKryo);
                 throw new IllegalArgumentException(msg);
@@ -603,5 +618,16 @@ public final class GryoMapper implements Mapper<Kryo> {
 
             return kryo;
         }
+
+        @Override
+        public String toString() {
+            return new ToStringBuilder(this)
+                    .append("targetClass", clazz)
+                    .append("id", id)
+                    .append("shadedSerializer", shadedSerializer)
+                    .append("serializerShim", serializerShim)
+                    .append("functionOfShadedKryo", functionOfShadedKryo)
+                    .toString();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
index e7bf636..59f8a5d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java
@@ -40,6 +40,7 @@ import java.util.function.Function;
 public final class GryoPool {
     public static final String CONFIG_IO_REGISTRY = "gremlin.io.registry";
     public static final String CONFIG_IO_GRYO_POOL_SIZE = "gremlin.io.gryo.poolSize";
+    public static final int CONFIG_IO_GRYO_POOL_SIZE_DEFAULT = 256;
 
     public enum Type {READER, WRITER, READER_WRITER}
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
index ae99ac6..16fbe85 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
@@ -23,16 +23,16 @@ import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPath;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.Serializer;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
 
 /**
  * Class used to serialize graph-based objects such as vertices, edges, properties, and paths. These objects are
@@ -42,19 +42,19 @@ import org.apache.tinkerpop.shaded.kryo.io.Output;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-final class GryoSerializers {
+public final class GryoSerializers {
 
     /**
      * Serializes any {@link Edge} implementation encountered to a {@link DetachedEdge}.
      */
-    final static class EdgeSerializer extends Serializer<Edge> {
+    final static class EdgeSerializer implements SerializerShim<Edge> {
         @Override
-        public void write(final Kryo kryo, final Output output, final Edge edge) {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Edge edge) {
             kryo.writeClassAndObject(output, DetachedFactory.detach(edge, true));
         }
 
         @Override
-        public Edge read(final Kryo kryo, final Input input, final Class<Edge> edgeClass) {
+        public <I extends InputShim> Edge read(KryoShim<I, ?> kryo, I input, Class<Edge> edgeClass) {
             final Object o = kryo.readClassAndObject(input);
             return (Edge) o;
         }
@@ -63,14 +63,14 @@ final class GryoSerializers {
     /**
      * Serializes any {@link Vertex} implementation encountered to an {@link DetachedVertex}.
      */
-    final static class VertexSerializer extends Serializer<Vertex> {
+    final static class VertexSerializer implements SerializerShim<Vertex> {
         @Override
-        public void write(final Kryo kryo, final Output output, final Vertex vertex) {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Vertex vertex) {
             kryo.writeClassAndObject(output, DetachedFactory.detach(vertex, true));
         }
 
         @Override
-        public Vertex read(final Kryo kryo, final Input input, final Class<Vertex> vertexClass) {
+        public <I extends InputShim> Vertex read(KryoShim<I, ?> kryo, I input, Class<Vertex> vertexClass) {
             return (Vertex) kryo.readClassAndObject(input);
         }
     }
@@ -78,14 +78,14 @@ final class GryoSerializers {
     /**
      * Serializes any {@link Property} implementation encountered to an {@link DetachedProperty}.
      */
-    final static class PropertySerializer extends Serializer<Property> {
+    final static class PropertySerializer implements SerializerShim<Property> {
         @Override
-        public void write(final Kryo kryo, final Output output, final Property property) {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Property property) {
             kryo.writeClassAndObject(output, property instanceof VertexProperty ? DetachedFactory.detach((VertexProperty) property, true) : DetachedFactory.detach(property));
         }
 
         @Override
-        public Property read(final Kryo kryo, final Input input, final Class<Property> propertyClass) {
+        public <I extends InputShim> Property read(KryoShim<I, ?> kryo, I input, Class<Property> propertyClass) {
             return (Property) kryo.readClassAndObject(input);
         }
     }
@@ -93,14 +93,14 @@ final class GryoSerializers {
     /**
      * Serializes any {@link VertexProperty} implementation encountered to an {@link DetachedVertexProperty}.
      */
-    final static class VertexPropertySerializer extends Serializer<VertexProperty> {
+    final static class VertexPropertySerializer implements SerializerShim<VertexProperty> {
         @Override
-        public void write(final Kryo kryo, final Output output, final VertexProperty vertexProperty) {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, VertexProperty vertexProperty) {
             kryo.writeClassAndObject(output, DetachedFactory.detach(vertexProperty, true));
         }
 
         @Override
-        public VertexProperty read(final Kryo kryo, final Input input, final Class<VertexProperty> vertexPropertyClass) {
+        public <I extends InputShim> VertexProperty read(KryoShim<I, ?> kryo, I input, Class<VertexProperty> vertexPropertyClass) {
             return (VertexProperty) kryo.readClassAndObject(input);
         }
     }
@@ -108,14 +108,14 @@ final class GryoSerializers {
     /**
      * Serializes any {@link Path} implementation encountered to an {@link DetachedPath}.
      */
-    final static class PathSerializer extends Serializer<Path> {
+    public final static class PathSerializer implements SerializerShim<Path> {
         @Override
-        public void write(final Kryo kryo, final Output output, final Path path) {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Path path) {
             kryo.writeClassAndObject(output, DetachedFactory.detach(path, false));
         }
 
         @Override
-        public Path read(final Kryo kryo, final Input input, final Class<Path> pathClass) {
+        public <I extends InputShim> Path read(KryoShim<I, ?> kryo, I input, Class<Path> pathClass) {
             return (Path) kryo.readClassAndObject(input);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java
index 1d4e236..8b14345 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java
@@ -18,10 +18,10 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
-import org.apache.tinkerpop.shaded.kryo.Kryo;
-import org.apache.tinkerpop.shaded.kryo.Serializer;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -48,17 +48,14 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link Duration} class.
      */
-    final static class DurationSerializer extends Serializer<Duration>
-    {
+    final static class DurationSerializer implements SerializerShim<Duration> {
         @Override
-        public void write(final Kryo kryo, final Output output, final Duration duration)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Duration duration) {
             output.writeLong(duration.toNanos());
         }
 
         @Override
-        public Duration read(final Kryo kryo, final Input input, final Class<Duration> durationClass)
-        {
+        public <I extends InputShim> Duration read(KryoShim<I, ?> kryo, I input, Class<Duration> durationClass) {
             return Duration.ofNanos(input.readLong());
         }
     }
@@ -66,18 +63,15 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link Instant} class.
      */
-    final static class InstantSerializer extends Serializer<Instant>
-    {
+    final static class InstantSerializer implements SerializerShim<Instant> {
         @Override
-        public void write(Kryo kryo, Output output, Instant instant)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Instant instant) {
             output.writeLong(instant.getEpochSecond());
             output.writeInt(instant.getNano());
         }
 
         @Override
-        public Instant read(Kryo kryo, Input input, Class<Instant> aClass)
-        {
+        public <I extends InputShim> Instant read(KryoShim<I, ?> kryo, I input, Class<Instant> aClass) {
             return Instant.ofEpochSecond(input.readLong(), input.readInt());
         }
     }
@@ -85,17 +79,14 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link LocalDate} class.
      */
-    final static class LocalDateSerializer extends Serializer<LocalDate>
-    {
+    final static class LocalDateSerializer implements SerializerShim<LocalDate> {
         @Override
-        public void write(final Kryo kryo, final Output output, final LocalDate localDate)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalDate localDate) {
             output.writeLong(localDate.toEpochDay());
         }
 
         @Override
-        public LocalDate read(final Kryo kryo, final Input input, final Class<LocalDate> clazz)
-        {
+        public <I extends InputShim> LocalDate read(KryoShim<I, ?> kryo, I input, Class<LocalDate> clazz) {
             return LocalDate.ofEpochDay(input.readLong());
         }
     }
@@ -103,11 +94,9 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link LocalDateTime} class.
      */
-    final static class LocalDateTimeSerializer extends Serializer<LocalDateTime>
-    {
+    final static class LocalDateTimeSerializer implements SerializerShim<LocalDateTime> {
         @Override
-        public void write(final Kryo kryo, final Output output, final LocalDateTime localDateTime)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalDateTime localDateTime) {
             output.writeInt(localDateTime.getYear());
             output.writeInt(localDateTime.getMonthValue());
             output.writeInt(localDateTime.getDayOfMonth());
@@ -118,8 +107,7 @@ final class JavaTimeSerializers {
         }
 
         @Override
-        public LocalDateTime read(final Kryo kryo, final Input input, final Class<LocalDateTime> clazz)
-        {
+        public <I extends InputShim> LocalDateTime read(KryoShim<I, ?> kryo, I input, Class<LocalDateTime> clazz) {
             return LocalDateTime.of(input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt());
         }
     }
@@ -127,17 +115,14 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link LocalTime} class.
      */
-    final static class LocalTimeSerializer extends Serializer<LocalTime>
-    {
+    final static class LocalTimeSerializer implements SerializerShim<LocalTime> {
         @Override
-        public void write(final Kryo kryo, final Output output, final LocalTime localTime)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalTime localTime) {
             output.writeLong(localTime.toNanoOfDay());
         }
 
         @Override
-        public LocalTime read(final Kryo kryo, final Input input, final Class<LocalTime> clazz)
-        {
+        public <I extends InputShim> LocalTime read(KryoShim<I, ?> kryo, I input, Class<LocalTime> clazz) {
             return LocalTime.ofNanoOfDay(input.readLong());
         }
     }
@@ -145,37 +130,31 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link MonthDay} class.
      */
-    final static class MonthDaySerializer extends Serializer<MonthDay>
-    {
+    final static class MonthDaySerializer implements SerializerShim<MonthDay> {
         @Override
-        public void write(final Kryo kryo, final Output output, final MonthDay monthDay)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, MonthDay monthDay) {
             output.writeInt(monthDay.getMonthValue());
             output.writeInt(monthDay.getDayOfMonth());
         }
 
         @Override
-        public MonthDay read(final Kryo kryo, final Input input, final Class<MonthDay> clazz)
-        {
-            return MonthDay.of(input.readInt(), input.readInt());
+        public <I extends InputShim> MonthDay read(KryoShim<I, ?> kryo, I input, Class<MonthDay> clazz) {
+            return null;
         }
     }
 
     /**
      * Serializer for the {@link OffsetDateTime} class.
      */
-    final static class OffsetDateTimeSerializer extends Serializer<OffsetDateTime>
-    {
+    final static class OffsetDateTimeSerializer implements SerializerShim<OffsetDateTime> {
         @Override
-        public void write(final Kryo kryo, final Output output, final OffsetDateTime offsetDateTime)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, OffsetDateTime offsetDateTime) {
             kryo.writeObject(output, offsetDateTime.toLocalDateTime());
             kryo.writeObject(output, offsetDateTime.getOffset());
         }
 
         @Override
-        public OffsetDateTime read(final Kryo kryo, final Input input, final Class<OffsetDateTime> clazz)
-        {
+        public <I extends InputShim> OffsetDateTime read(KryoShim<I, ?> kryo, I input, Class<OffsetDateTime> clazz) {
             return OffsetDateTime.of(kryo.readObject(input, LocalDateTime.class), kryo.readObject(input, ZoneOffset.class));
         }
     }
@@ -183,18 +162,15 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link OffsetTime} class.
      */
-    final static class OffsetTimeSerializer extends Serializer<OffsetTime>
-    {
+    final static class OffsetTimeSerializer implements SerializerShim<OffsetTime> {
         @Override
-        public void write(final Kryo kryo, final Output output, final OffsetTime offsetTime)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, OffsetTime offsetTime) {
             kryo.writeObject(output, offsetTime.toLocalTime());
             kryo.writeObject(output, offsetTime.getOffset());
         }
 
         @Override
-        public OffsetTime read(final Kryo kryo, final Input input, final Class<OffsetTime> clazz)
-        {
+        public <I extends InputShim> OffsetTime read(KryoShim<I, ?> kryo, I input, Class<OffsetTime> clazz) {
             return OffsetTime.of(kryo.readObject(input, LocalTime.class), kryo.readObject(input, ZoneOffset.class));
         }
     }
@@ -202,19 +178,16 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link Period} class.
      */
-    final static class PeriodSerializer extends Serializer<Period>
-    {
+    final static class PeriodSerializer implements SerializerShim<Period> {
         @Override
-        public void write(final Kryo kryo, final Output output, final Period period)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Period period) {
             output.writeInt(period.getYears());
             output.writeInt(period.getMonths());
             output.writeInt(period.getDays());
         }
 
         @Override
-        public Period read(final Kryo kryo, final Input input, final Class<Period> clazz)
-        {
+        public <I extends InputShim> Period read(KryoShim<I, ?> kryo, I input, Class<Period> clazz) {
             return Period.of(input.readInt(), input.readInt(), input.readInt());
         }
     }
@@ -222,17 +195,14 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link Year} class.
      */
-    final static class YearSerializer extends Serializer<Year>
-    {
+    final static class YearSerializer implements SerializerShim<Year> {
         @Override
-        public void write(final Kryo kryo, final Output output, final Year year)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Year year) {
             output.writeInt(year.getValue());
         }
 
         @Override
-        public Year read(final Kryo kryo, final Input input, final Class<Year> clazz)
-        {
+        public <I extends InputShim> Year read(KryoShim<I, ?> kryo, I input, Class<Year> clazz) {
             return Year.of(input.readInt());
         }
     }
@@ -240,18 +210,15 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link YearMonth} class.
      */
-    final static class YearMonthSerializer extends Serializer<YearMonth>
-    {
+    final static class YearMonthSerializer implements SerializerShim<YearMonth> {
         @Override
-        public void write(final Kryo kryo, final Output output, final YearMonth monthDay)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, YearMonth monthDay) {
             output.writeInt(monthDay.getYear());
             output.writeInt(monthDay.getMonthValue());
         }
 
         @Override
-        public YearMonth read(final Kryo kryo, final Input input, final Class<YearMonth> clazz)
-        {
+        public <I extends InputShim> YearMonth read(KryoShim<I, ?> kryo, I input, Class<YearMonth> clazz) {
             return YearMonth.of(input.readInt(), input.readInt());
         }
     }
@@ -259,11 +226,9 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link ZonedDateTime} class.
      */
-    final static class ZonedDateTimeSerializer extends Serializer<ZonedDateTime>
-    {
+    final static class ZonedDateTimeSerializer implements SerializerShim<ZonedDateTime> {
         @Override
-        public void write(final Kryo kryo, final Output output, final ZonedDateTime zonedDateTime)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, ZonedDateTime zonedDateTime) {
             output.writeInt(zonedDateTime.getYear());
             output.writeInt(zonedDateTime.getMonthValue());
             output.writeInt(zonedDateTime.getDayOfMonth());
@@ -275,8 +240,7 @@ final class JavaTimeSerializers {
         }
 
         @Override
-        public ZonedDateTime read(final Kryo kryo, final Input input, final Class<ZonedDateTime> clazz)
-        {
+        public <I extends InputShim> ZonedDateTime read(KryoShim<I, ?> kryo, I input, Class<ZonedDateTime> clazz) {
             return ZonedDateTime.of(input.readInt(), input.readInt(), input.readInt(),
                     input.readInt(), input.readInt(), input.readInt(), input.readInt(),
                     ZoneId.of(input.readString()));
@@ -286,17 +250,14 @@ final class JavaTimeSerializers {
     /**
      * Serializer for the {@link ZoneOffset} class.
      */
-    final static class ZoneOffsetSerializer extends Serializer<ZoneOffset>
-    {
+    final static class ZoneOffsetSerializer implements SerializerShim<ZoneOffset> {
         @Override
-        public void write(final Kryo kryo, final Output output, final ZoneOffset zoneOffset)
-        {
+        public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, ZoneOffset zoneOffset) {
             output.writeString(zoneOffset.getId());
         }
 
         @Override
-        public ZoneOffset read(final Kryo kryo, final Input input, final Class<ZoneOffset> clazz)
-        {
+        public <I extends InputShim> ZoneOffset read(KryoShim<I, ?> kryo, I input, Class<ZoneOffset> clazz) {
             return ZoneOffset.of(input.readString());
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java
index e5e92e7..0464b22 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java
@@ -18,6 +18,10 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo;
 
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.Serializer;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
@@ -27,16 +31,15 @@ import org.javatuples.Pair;
 /**
  * @author Daniel Kuppitz (http://gremlin.guru)
  */
-final class PairSerializer extends Serializer<Pair> {
-
+final class PairSerializer implements SerializerShim<Pair> {
     @Override
-    public void write(final Kryo kryo, final Output output, final Pair pair) {
+    public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Pair pair) {
         kryo.writeClassAndObject(output, pair.getValue0());
         kryo.writeClassAndObject(output, pair.getValue1());
     }
 
     @Override
-    public Pair read(final Kryo kryo, final Input input, final Class<Pair> pairClass) {
+    public <I extends InputShim> Pair read(KryoShim<I, ?> kryo, I input, Class<Pair> pairClass) {
         return Pair.with(kryo.readClassAndObject(input), kryo.readClassAndObject(input));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
index ef105ce..1f41c0d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java
@@ -67,4 +67,16 @@ public interface TypeRegistration<T> {
      * @return the sole parameter
      */
     Kryo registerWith(Kryo kryo);
+
+    /**
+     * Returns true if at least one of {@link #getShadedSerializer()}, {@link #getSerializerShim()}, or
+     * {@link #getFunctionOfShadedKryo()} is non null.  Returns false if all are null.
+     *
+     * @return whether a serializer is defined for this type registration
+     */
+    default boolean hasSerializer() {
+        return null != getFunctionOfShadedKryo() ||
+                null != getSerializerShim() ||
+                null != getShadedSerializer();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
index 959605c..7783856 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
@@ -18,6 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
 
+import org.apache.commons.configuration.Configuration;
+
 import java.io.InputStream;
 import java.io.OutputStream;
 
@@ -80,4 +82,18 @@ public interface KryoShimService {
      * @return this implementation's priority value
      */
     int getPriority();
+
+    /**
+     * Attempt to incorporate the supplied configuration in future read/write calls.
+     * <p>
+     * This method is a wart that exists essentially just to support the old
+     * {@link HadoopPools#initialize(Configuration)} use-case.
+     * <p>
+     * This method is not guaranteed to have any effect on an instance of this interface
+     * after {@link #writeClassAndObject(Object, OutputStream)} or {@link #readClassAndObject(InputStream)}
+     * has been invoked on that particular instance.
+     *
+     * @param conf the configuration to apply to this service's internal serializer
+     */
+    void applyConfiguration(Configuration conf);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index 9ccf2de..9184dd0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
 
+import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 import org.slf4j.Logger;
@@ -35,7 +36,9 @@ import java.util.ServiceLoader;
  */
 public class KryoShimServiceLoader {
 
-    private static volatile KryoShimService CACHED_SHIM_SERVICE;
+    private static volatile KryoShimService cachedShimService;
+
+    private static volatile Configuration conf;
 
     private static final Logger log = LoggerFactory.getLogger(KryoShimServiceLoader.class);
 
@@ -46,6 +49,10 @@ public class KryoShimServiceLoader {
      */
     public static final String SHIM_CLASS_SYSTEM_PROPERTY = "tinkerpop.kryo.shim";
 
+    public static void applyConfiguration(Configuration conf) {
+        KryoShimServiceLoader.conf = conf;
+    }
+
     /**
      * Return a reference to the shim service.  This method may return a cached shim service
      * unless {@code forceReload} is true.  Calls to this method need not be externally
@@ -58,8 +65,8 @@ public class KryoShimServiceLoader {
      */
     public static KryoShimService load(boolean forceReload) {
 
-        if (null != CACHED_SHIM_SERVICE && !forceReload) {
-            return CACHED_SHIM_SERVICE;
+        if (null != cachedShimService && !forceReload) {
+            return cachedShimService;
         }
 
         ArrayList<KryoShimService> services = new ArrayList<>();
@@ -109,7 +116,15 @@ public class KryoShimServiceLoader {
         log.info("Set {} provider to {} ({}) because its priority value ({}) is the highest available",
                 KryoShimService.class.getSimpleName(), result, result.getClass(), result.getPriority());
 
-        return CACHED_SHIM_SERVICE = result;
+        Configuration userConf = conf;
+
+        if (null != userConf) {
+            log.info("Configuring {} provider {} with user-provided configuration",
+                    KryoShimService.class.getSimpleName(), result);
+            result.applyConfiguration(userConf);
+        }
+
+        return cachedShimService = result;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java
index 191cdd8..e5f9005 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java
@@ -26,7 +26,7 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
  */
 public interface SerializerShim<T> {
 
-    <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, T starGraph);
+    <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, T object);
 
     <I extends InputShim> T read(KryoShim<I, ?> kryo, I input, Class<T> clazz);
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
index de1e2f9..06778e6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public final class HadoopCombine extends Reducer<ObjectWritable, ObjectWritable,
     @Override
     public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration());
-        HadoopPools.initialize(apacheConfiguration);
+        KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
         this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
         this.mapReduce.workerStart(MapReduce.Stage.COMBINE);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
index 9e6fac3..5fc7026 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,7 @@ public final class HadoopMap extends Mapper<NullWritable, VertexWritable, Object
     @Override
     public void setup(final Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context) {
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration());
-        HadoopPools.initialize(apacheConfiguration);
+        KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
         this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
         this.mapReduce.workerStart(MapReduce.Stage.MAP);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
index 06dfba1..6ca7b8f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public final class HadoopReduce extends Reducer<ObjectWritable, ObjectWritable,
     @Override
     public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration());
-        HadoopPools.initialize(apacheConfiguration);
+        KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
         this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
         this.mapReduce.workerStart(MapReduce.Stage.REDUCE);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index c19b914..5753d90 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -18,7 +18,9 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
+import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
@@ -66,4 +68,9 @@ public class HadoopPoolShimService implements KryoShimService {
     public int getPriority() {
         return 0;
     }
+
+    @Override
+    public void applyConfiguration(Configuration conf) {
+        KryoShimServiceLoader.applyConfiguration(conf);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
index f3a1bac..0e5f135 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java
@@ -20,6 +20,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -28,7 +30,7 @@ public interface HadoopPoolsConfigurable extends Configurable {
 
     @Override
     public default void setConf(final Configuration configuration) {
-        HadoopPools.initialize(configuration);
+        KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index d7ed46b..a1daddf 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -71,7 +72,7 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
         final Configuration configuration = context.getConfiguration();
         if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
             this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
-        HadoopPools.initialize(configuration);
+        KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
         this.gryoReader = HadoopPools.getGryoPool().takeReader();
         long start = split.getStart();
         final Path file = split.getPath();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
index 67a8339..2ea3394 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -43,7 +45,7 @@ public final class GryoRecordWriter extends RecordWriter<NullWritable, VertexWri
     public GryoRecordWriter(final DataOutputStream outputStream, final Configuration configuration) {
         this.outputStream = outputStream;
         this.hasEdges = configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true);
-        HadoopPools.initialize(configuration);
+        KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration));
         this.gryoWriter = HadoopPools.getGryoPool().takeWriter();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index c2b85dd..9e5ac53 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -38,6 +38,7 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -89,7 +90,7 @@ public final class SparkExecutor {
                 graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
                 // for each partition of vertices emit a view and their outgoing messages
                 .mapPartitionsToPair(partitionIterator -> {
-                    HadoopPools.initialize(apacheConfiguration);
+                    KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
                     final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
deleted file mode 100644
index 4c99e70..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.structure.io;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
-import org.apache.spark.serializer.KryoRegistrator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalExplanation;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.ObjectWritableSerializer;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A spark.kryo.registrator implementation that installs TinkerPop types.
- * This is intended for use with spark.serializer=KryoSerializer, not GryoSerializer.
- */
-public class TinkerPopKryoRegistrator implements KryoRegistrator {
-
-    private static final Logger log = LoggerFactory.getLogger(TinkerPopKryoRegistrator.class);
-
-    @Override
-    public void registerClasses(Kryo kryo) {
-        // TinkerPop type registrations copied from GyroSerializer's constructor
-        kryo.register(MessagePayload.class);
-        kryo.register(ViewIncomingPayload.class);
-        kryo.register(ViewOutgoingPayload.class);
-        kryo.register(ViewPayload.class);
-        kryo.register(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer()));
-        kryo.register(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>()));
-
-        Set<Class<?>> shimmedClasses = new HashSet<>();
-
-        Set<Class<?>> javaSerializationClasses = new HashSet<>();
-
-        // Copy GryoMapper's default registrations
-        for (TypeRegistration<?> tr : GryoMapper.build().create().getTypeRegistrations()) {
-            // Special case for JavaSerializer, which is generally implemented in terms of TinkerPop's
-            // problematic static GryoMapper/GryoSerializer pool (these are handled below the loop)
-            org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer();
-            SerializerShim<?> serializerShim = tr.getSerializerShim();
-            if (null != shadedSerializer &&
-                    shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) {
-                javaSerializationClasses.add(tr.getTargetClass());
-            } else if (null != serializerShim) {
-                log.debug("Registering class {} to serializer shim {} (serializer shim class {})",
-                        tr.getTargetClass(), serializerShim, serializerShim.getClass());
-                kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim));
-                shimmedClasses.add(tr.getTargetClass());
-            } else {
-                // Register with the default behavior (FieldSerializer)
-                log.debug("Registering class {} with default serializer", tr.getTargetClass());
-                kryo.register(tr.getTargetClass());
-            }
-        }
-
-        Map<Class<?>, Serializer<?>> javaSerializerReplacements = new HashMap<>();
-        javaSerializerReplacements.put(GroupStep.GroupBiOperator.class, new JavaSerializer());
-        javaSerializerReplacements.put(OrderGlobalStep.OrderBiOperator.class, null);
-        javaSerializerReplacements.put(TraversalExplanation.class, null);
-
-        for (Map.Entry<Class<?>, Serializer<?>> e : javaSerializerReplacements.entrySet()) {
-            Class<?> c = e.getKey();
-            Serializer<?> s = e.getValue();
-
-            if (javaSerializationClasses.remove(c)) {
-                if (null != s) {
-                    log.debug("Registering class {} with serializer {}", c, s);
-                    kryo.register(c, s);
-                } else {
-                    log.debug("Registering class {} with default serializer", c);
-                    kryo.register(c);
-                }
-            } else {
-                log.debug("Registering class {} with JavaSerializer", c);
-                kryo.register(c, new JavaSerializer());
-            }
-        }
-
-        // We really care about StarGraph's shim serializer, so make sure we registered it
-        if (!shimmedClasses.contains(StarGraph.class)) {
-            log.warn("No SerializerShim found for StarGraph");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 2c1dfa2..28a4d55 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -78,7 +78,7 @@ public final class GryoSerializer extends Serializer {
             }
         }
         this.gryoPool = GryoPool.build().
-                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
+                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
                 ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).
                 initializeMapper(builder -> {
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
new file mode 100644
index 0000000..8b21e21
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Copyright DataStax, Inc.
+ * <p>
+ * Please see the included license file for details.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}.
+ */
+public class IoRegistryAwareKryoSerializer extends KryoSerializer {
+
+    private final SparkConf conf;
+
+    private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class);
+
+    public IoRegistryAwareKryoSerializer(SparkConf conf) {
+        super(conf);
+        // store conf so that we can access its registry (if one is present) in newKryo()
+        this.conf = conf;
+    }
+
+    @Override
+    public Kryo newKryo() {
+        Kryo kryo = super.newKryo();
+
+        return applyIoRegistryIfPresent(kryo);
+    }
+
+    private Kryo applyIoRegistryIfPresent(Kryo kryo) {
+        if (!conf.contains(GryoPool.CONFIG_IO_REGISTRY)) {
+            log.info("SparkConf {} does not contain setting {}, skipping {} handling",
+                    GryoPool.CONFIG_IO_REGISTRY, conf, IoRegistry.class.getCanonicalName());
+            return kryo;
+        }
+
+        String registryClassnames = conf.get(GryoPool.CONFIG_IO_REGISTRY);
+
+        for (String registryClassname : registryClassnames.split(",")) {
+            final IoRegistry registry;
+
+            try {
+                registry = (IoRegistry) Class.forName(registryClassname).newInstance();
+                log.info("Instantiated {}", registryClassname);
+            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                log.error("Unable to reflectively instantiate the {} implementation named {}",
+                        IoRegistry.class.getCanonicalName(), registryClassname, e);
+                return kryo;
+            }
+
+            // Left is the class targeted for serialization, right is a mess of potential types, including
+            // a shaded Serializer impl, unshaded Serializer impl, or Function<shaded.Kryo,shaded.Serializer>
+            final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class);
+
+            if (null == serializers) {
+                log.info("Invoking find({}.class) returned null on registry {}; ignoring this registry",
+                        GryoIo.class.getCanonicalName(), registry);
+                return kryo;
+            }
+
+            for (Pair<Class, Object> p : serializers) {
+                if (null == p.getValue1()) {
+                    // null on the right is fine
+                    log.info("Registering {} with default serializer", p.getValue0());
+                    kryo.register(p.getValue0());
+                } else if (p.getValue1() instanceof Serializer) {
+                    // unshaded serializer on the right is fine
+                    log.info("Registering {} with serializer {}", p.getValue0(), p.getValue1());
+                    kryo.register(p.getValue0(), (Serializer) p.getValue1());
+                } else {
+                    // anything else on the right is unsupported with Spark
+                    log.error("Serializer {} found in {} must implement {} " +
+                                    "(the shaded interface {} is not supported on Spark).  This class will be registered with " +
+                                    "the default behavior of Spark's KryoSerializer.",
+                            p.getValue1(), registryClassname, Serializer.class.getCanonicalName(),
+                            org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+                    kryo.register(p.getValue0());
+                }
+            }
+        }
+
+        return kryo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java
new file mode 100644
index 0000000..bdb80fd
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import org.apache.spark.serializer.KryoRegistrator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * A spark.kryo.registrator implementation that installs TinkerPop types.
+ * This is intended for use with spark.serializer=KryoSerializer, not GryoSerializer.
+ */
+public class TinkerPopKryoRegistrator implements KryoRegistrator {
+
+    private static final Logger log = LoggerFactory.getLogger(TinkerPopKryoRegistrator.class);
+
+    @Override
+    public void registerClasses(Kryo kryo) {
+        registerClasses(kryo, Collections.emptyMap(), Collections.emptySet());
+    }
+
+    /**
+     * Register TinkerPop's classes with the supplied {@link Kryo} instance
+     * while honoring optional overrides and optional class blacklist ("blackset"?).
+     *
+     * @param kryo the Kryo serializer instance with which to register types
+     * @param serializerOverrides serializer mappings that override this class's defaults
+     * @param blacklist classes which should not be registered at all, even if there is an override entry
+     *                  or if they would be registered by this class by default (does not affect Kryo's
+     *                  built-in registrations, e.g. String.class).
+     */
+    public void registerClasses(Kryo kryo, Map<Class<?>, Serializer<?>> serializerOverrides, Set<Class<?>> blacklist) {
+        // Apply TinkerPop type registrations copied from GyroSerializer's constructor
+        for (Map.Entry<Class<?>, Serializer<?>> ent : getExtraRegistrations().entrySet()) {
+            Class<?> targetClass = ent.getKey();
+            Serializer<?> ser = ent.getValue();
+
+            // Is this class blacklisted?  Skip it. (takes precedence over serializerOverrides)
+            if (blacklist.contains(targetClass)) {
+                log.debug("Not registering serializer for {} (blacklisted)", targetClass);
+                continue;
+            }
+
+            if (checkForAndApplySerializerOverride(serializerOverrides, kryo, targetClass)) {
+                // do nothing but skip the remaining else(-if) clauses
+            } else if (null == ser) {
+                log.debug("Registering {} with default serializer", targetClass);
+                kryo.register(targetClass);
+            } else {
+                log.debug("Registering {} with serializer {}", targetClass, ser);
+                kryo.register(targetClass, ser);
+            }
+        }
+
+        Set<Class<?>> shimmedClassesFromGryoMapper = new HashSet<>();
+
+        // Apply GryoMapper's default registrations
+        for (TypeRegistration<?> tr : GryoMapper.build().create().getTypeRegistrations()) {
+            // Is this class blacklisted?  Skip it. (takes precedence over serializerOverrides)
+            if (blacklist.contains(tr.getTargetClass())) {
+                log.debug("Not registering serializer for {} (blacklisted)", tr.getTargetClass());
+                continue;
+            }
+
+            final org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer();
+            final SerializerShim<?> serializerShim = tr.getSerializerShim();
+            final java.util.function.Function<
+                    org.apache.tinkerpop.shaded.kryo.Kryo,
+                    org.apache.tinkerpop.shaded.kryo.Serializer> functionOfShadedKryo = tr.getFunctionOfShadedKryo();
+
+            // Apply overrides with the highest case-precedence
+            if (checkForAndApplySerializerOverride(serializerOverrides, kryo, tr.getTargetClass())) {
+                // do nothing but skip the remaining else(-if) clauses
+            } else if (null != shadedSerializer) {
+                if (shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) {
+                    // Convert GryoMapper's shaded JavaSerializer mappings to their unshaded equivalents
+                    log.debug("Registering {} with JavaSerializer", tr.getTargetClass());
+                    kryo.register(tr.getTargetClass(), new JavaSerializer());
+                } else {
+                    // There's supposed to be a check in GryoMapper that prevents this from happening
+                    log.error("GryoMapper's default serialization registration for {} is a {}. " +
+                              "This is probably a bug in TinkerPop (this is not a valid default registration). " +
+                              "I am configuring Spark to use Kryo's default serializer for this class, " +
+                              "but this may cause serialization failures at runtime.",
+                              tr.getTargetClass(),
+                              org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+                    kryo.register(tr.getTargetClass());
+                }
+            } else if (null != serializerShim) {
+                // Wrap shim serializers in an adapter for Spark's unshaded Kryo
+                log.debug("Registering {} to serializer shim {} (serializer shim {})",
+                        tr.getTargetClass(), serializerShim, serializerShim.getClass());
+                kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim));
+                shimmedClassesFromGryoMapper.add(tr.getTargetClass());
+            } else if (null != functionOfShadedKryo) {
+                // As with shaded serializers, there's supposed to be a check in GryoMapper that prevents this from happening
+                log.error("GryoMapper's default serialization registration for {} is a Function<{},{}>.  " +
+                          "This is probably a bug in TinkerPop (this is not a valid default registration). " +
+                          "I am configuring Spark to use Kryo's default serializer instead of this function, " +
+                          "but this may cause serialization failures at runtime.",
+                          tr.getTargetClass(),
+                          org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(),
+                          org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+                kryo.register(tr.getTargetClass());
+            } else {
+                // Register all other classes with the default behavior (FieldSerializer)
+                log.debug("Registering {} with default serializer", tr.getTargetClass());
+                kryo.register(tr.getTargetClass());
+            }
+        }
+
+        // StarGraph's shim serializer is especially important on Spark for efficiency reasons,
+        // so log a warning if we failed to register it somehow
+        if (!shimmedClassesFromGryoMapper.contains(StarGraph.class)) {
+            log.warn("No SerializerShim found for StarGraph");
+        }
+    }
+
+    private LinkedHashMap<Class<?>, Serializer<?>> getExtraRegistrations() {
+
+        /* The map returned by this method MUST have a fixed iteration order!
+         *
+         * The order itself is irrelevant, so long as it is completely stable at runtime.
+         *
+         * LinkedHashMap satisfies this requirement (its contract specifies
+         * iteration in key-insertion-order).
+         */
+
+        LinkedHashMap<Class<?>, Serializer<?>> m = new LinkedHashMap<>();
+        // The following entries were copied from GryoSerializer's constructor
+        // This could be turned into a static collection on GryoSerializer to avoid
+        // duplication, but it would be a bit cumbersome to do so without disturbing
+        // the ordering of the existing entries in that constructor, since not all
+        // of the entries are for TinkerPop (and the ordering is significant).
+        m.put(MessagePayload.class, null);
+        m.put(ViewIncomingPayload.class, null);
+        m.put(ViewOutgoingPayload.class, null);
+        m.put(ViewPayload.class, null);
+        m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer()));
+        m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>()));
+
+        return m;
+    }
+
+    private boolean checkForAndApplySerializerOverride(Map<Class<?>, Serializer<?>> serializerOverrides,
+                                                       Kryo kryo, Class<?> targetClass) {
+        if (serializerOverrides.containsKey(targetClass)) {
+            Serializer<?> ser = serializerOverrides.get(targetClass);
+            if (null == ser) {
+                // null means use Kryo's default serializer
+                log.debug("Registering {} with default serializer per overrides", targetClass);
+                kryo.register(targetClass);
+            } else {
+                // nonnull means use that serializer
+                log.debug("Registering {} with serializer {} per overrides", targetClass, ser);
+                kryo.register(targetClass, ser);
+            }
+            return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
index d0411e8..a524a97 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -24,92 +24,131 @@
  */
 package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
 
-import com.twitter.chill.KryoInstantiator;
-import com.twitter.chill.KryoPool;
-import com.twitter.chill.SerDeState;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
 import org.apache.spark.SparkConf;
-import org.apache.spark.serializer.KryoSerializer;
-import org.apache.tinkerpop.gremlin.spark.structure.io.TinkerPopKryoRegistrator;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.IoRegistryAwareKryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class UnshadedKryoShimService implements KryoShimService {
 
-    public static final String SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY = "tinkerpop.kryo.poolsize";
-
     private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class);
-    private static final int SPARK_KRYO_POOL_SIZE_DEFAULT = 8;
 
-    private final KryoSerializer sparkKryoSerializer;
-    private final KryoPool kryoPool;
+    private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>();
 
-    public UnshadedKryoShimService() {
-        this(TinkerPopKryoRegistrator.class.getCanonicalName(), getDefaultKryoPoolSize());
-    }
+    private static volatile boolean initialized;
 
-    public UnshadedKryoShimService(String sparkKryoRegistratorClassname, int kryoPoolSize) {
-        SparkConf sparkConf = new SparkConf();
-        sparkConf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
-        sparkConf.set("spark.kryo.registrator", sparkKryoRegistratorClassname);
-        sparkKryoSerializer = new KryoSerializer(sparkConf);
-        kryoPool = KryoPool.withByteArrayOutputStream(kryoPoolSize, new KryoInstantiator());
-    }
+    public UnshadedKryoShimService() { }
 
     @Override
     public Object readClassAndObject(InputStream source) {
-        SerDeState sds = null;
-        try {
-            sds = kryoPool.borrow();
 
-            sds.setInput(source);
+        LinkedBlockingQueue<Kryo> kryos = initialize();
 
-            return sds.readClassAndObject();
+        Kryo k = null;
+        try {
+            k = kryos.take();
+
+            return k.readClassAndObject(new Input(source));
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
         } finally {
-            kryoPool.release(sds);
+            try {
+                kryos.put(k);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
     @Override
     public void writeClassAndObject(Object o, OutputStream sink) {
-        SerDeState sds = null;
+
+        LinkedBlockingQueue<Kryo> kryos = initialize();
+
+        Kryo k = null;
         try {
-            sds = kryoPool.borrow();
+            k = kryos.take();
 
-            sds.writeClassAndObject(o); // this writes to an internal buffer
+            Output kryoOutput = new Output(sink);
 
-            sds.writeOutputTo(sink); // this copies the internal buffer to sink
+            k.writeClassAndObject(kryoOutput, o);
 
-            sink.flush();
-        } catch (IOException e) {
+            kryoOutput.flush();
+        } catch (InterruptedException e) {
             throw new RuntimeException(e);
         } finally {
-            kryoPool.release(sds);
+            try {
+                kryos.put(k);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
     @Override
     public int getPriority() {
-        return 1024;
+        return 50;
     }
 
-    private static int getDefaultKryoPoolSize() {
-        String raw = System.getProperty(SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY);
+    @Override
+    public void applyConfiguration(Configuration conf) {
+        initialize(conf);
+    }
 
-        int size = SPARK_KRYO_POOL_SIZE_DEFAULT;
-        try {
-            size = Integer.valueOf(raw);
-            log.info("Setting kryo pool size to {} according to system property {}", size,
-                    SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY);
-        } catch (NumberFormatException e) {
-            log.error("System property {}={} could not be parsed as an integer, using default value {}",
-                    SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY, raw, size, e);
+    private LinkedBlockingQueue<Kryo> initialize() {
+        return initialize(new BaseConfiguration());
+    }
+
+    private LinkedBlockingQueue<Kryo> initialize(Configuration conf) {
+        // DCL is safe in this case due to volatility
+        if (!initialized) {
+            synchronized (UnshadedKryoShimService.class) {
+                if (!initialized) {
+                    SparkConf sparkConf = new SparkConf();
+
+                    // Copy the user's IoRegistry from the param conf to the SparkConf we just created
+                    String regStr = conf.getString(GryoPool.CONFIG_IO_REGISTRY);
+                    if (null != regStr) { // SparkConf rejects null values with NPE, so this has to be checked before set(...)
+                        sparkConf.set(GryoPool.CONFIG_IO_REGISTRY, regStr);
+                    }
+                    // Setting spark.serializer here almost certainly isn't necessary, but it doesn't hurt
+                    sparkConf.set("spark.serializer", IoRegistryAwareKryoSerializer.class.getCanonicalName());
+
+                    String registrator = conf.getString("spark.kryo.registrator");
+                    if (null != registrator) {
+                        sparkConf.set("spark.kryo.registrator", registrator);
+                        log.info("Copied spark.kryo.registrator: {}", registrator);
+                    } else {
+                        log.info("Not copying spark.kryo.registrator");
+                    }
+
+                    // Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf)
+                    int poolSize = conf.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE,
+                            GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+                    // Instantiate the spark.serializer
+                    final IoRegistryAwareKryoSerializer ioReg = new IoRegistryAwareKryoSerializer(sparkConf);
+                    // Setup a pool backed by our spark.serializer instance
+
+                    for (int i = 0; i < poolSize; i++) {
+                        KRYOS.add(ioReg.newKryo());
+                    }
+
+                    initialized = true;
+                }
+            }
         }
 
-        return size;
+        return KRYOS;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService b/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
new file mode 100644
index 0000000..68712a6
--- /dev/null
+++ b/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
@@ -0,0 +1 @@
+org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService # Supports Spark



Mime
View raw message