tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [27/29] tinkerpop git commit: added SystemUtil to gremlin-core which is able to generate Configurations from System.properties. Useful in Hadoop GraphComputer settings where JVMs can have IO configuration information in them. HadoopPools will now try and
Date Wed, 26 Oct 2016 14:18:59 GMT
added SystemUtil to gremlin-core which is able to generate Configurations from System.properties.
Useful in Hadoop GraphComputer settings where JVMs can have IO configuration information in
them. HadoopPools will now try and generate a Configuraiton from System.properties if no configuration
is explicitly provided. Likewise for KryoShimServices (via KryoShimServiceLoader). SparkGraphComputer
will spawn its driver and executors using gremlin. and spark. System properties taken from
the SparkConf. Lots of nick nack cleanups to the shim service scene. cc/ @dalaro.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/982d2207
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/982d2207
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/982d2207

Branch: refs/heads/TINKERPOP-1389
Commit: 982d2207bc5840f0f3c83de480990af073e80122
Parents: 8367ab3
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed Oct 26 07:18:21 2016 -0600
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed Oct 26 08:16:26 2016 -0600

----------------------------------------------------------------------
 .../io/gryo/kryoshim/KryoShimServiceLoader.java | 108 ++++++++-----------
 .../tinkerpop/gremlin/util/SystemUtil.java      |  55 ++++++++++
 .../tinkerpop/gremlin/util/SystemUtilTest.java  |  89 +++++++++++++++
 .../hadoop/structure/io/HadoopPools.java        |  19 +---
 .../process/computer/SparkGraphComputer.java    |  18 +++-
 .../unshaded/UnshadedKryoShimService.java       |  56 ++++------
 ...SparkGremlinGryoSerializerIntegrateTest.java |  33 ++++++
 7 files changed, 259 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/982d2207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index f9e4c2e..7b67328 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -19,6 +19,8 @@
 package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
 
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.tinkerpop.gremlin.util.SystemUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,8 +49,12 @@ public class KryoShimServiceLoader {
     public static final String KRYO_SHIM_SERVICE = "gremlin.io.kryoShimService";
 
     public static void applyConfiguration(final Configuration configuration) {
-        KryoShimServiceLoader.configuration = configuration;
-        load(true);
+        if (null == KryoShimServiceLoader.configuration ||
+                !ConfigurationUtils.toString(KryoShimServiceLoader.configuration).equals(ConfigurationUtils.toString(configuration)))
{
+            KryoShimServiceLoader.configuration = configuration;
+            load(true);
+        } else
+            load(false);
     }
 
     /**
@@ -62,78 +68,54 @@ public class KryoShimServiceLoader {
      * @return the shim service
      */
     public static KryoShimService load(final boolean forceReload) {
-
-        if (null != cachedShimService && !forceReload) {
+        // if the service is loaded and doesn't need reloading, simply return in
+        if (null != cachedShimService && !forceReload)
             return cachedShimService;
-        }
-
-        final ArrayList<KryoShimService> services = new ArrayList<>();
 
-        final ServiceLoader<KryoShimService> sl = ServiceLoader.load(KryoShimService.class);
-
-        KryoShimService result = null;
+        // if the configuration is null, try and load the configuration from System.properties
+        if (null == configuration)
+            configuration = SystemUtil.getSystemPropertiesConfiguration("gremlin", true);
 
+        // get all of the shim services
+        final ArrayList<KryoShimService> services = new ArrayList<>();
+        final ServiceLoader<KryoShimService> serviceLoader = ServiceLoader.load(KryoShimService.class);
         synchronized (KryoShimServiceLoader.class) {
-            if (forceReload) {
-                sl.reload();
-            }
-
-            for (KryoShimService kss : sl) {
+            if (forceReload) serviceLoader.reload();
+            for (final KryoShimService kss : serviceLoader) {
                 services.add(kss);
             }
         }
-
-        String shimClass = null != configuration && configuration.containsKey(KRYO_SHIM_SERVICE)
?
-                configuration.getString(KRYO_SHIM_SERVICE) :
-                System.getProperty(KRYO_SHIM_SERVICE);
-
-        if (null != shimClass) {
-            for (KryoShimService kss : services) {
-                if (kss.getClass().getCanonicalName().equals(shimClass)) {
+        // if a shim service class is specified in the configuration, use it -- else, priority-based
+        if (configuration.containsKey(KRYO_SHIM_SERVICE)) {
+            for (final KryoShimService kss : services) {
+                if (kss.getClass().getCanonicalName().equals(configuration.getString(KRYO_SHIM_SERVICE)))
{
                     log.info("Set {} provider to {} ({}) from system property {}={}",
                             KryoShimService.class.getSimpleName(), kss, kss.getClass(),
-                            KRYO_SHIM_SERVICE, shimClass);
-                    result = kss;
+                            KRYO_SHIM_SERVICE, configuration.getString(KRYO_SHIM_SERVICE));
+                    cachedShimService = kss;
+                    break;
                 }
             }
         } else {
             Collections.sort(services, KryoShimServiceComparator.INSTANCE);
-
-            for (KryoShimService kss : services) {
+            for (final KryoShimService kss : services) {
                 log.debug("Found Kryo shim service class {} (priority {})", kss.getClass(),
kss.getPriority());
             }
-
             if (0 != services.size()) {
-                result = services.get(services.size() - 1);
-
+                cachedShimService = services.get(services.size() - 1);
                 log.info("Set {} provider to {} ({}) because its priority value ({}) is the
best available",
-                        KryoShimService.class.getSimpleName(), result, result.getClass(),
result.getPriority());
+                        KryoShimService.class.getSimpleName(), cachedShimService, cachedShimService.getClass(),
cachedShimService.getPriority());
             }
         }
 
-
-        if (null == result) {
+        // no shim service was available
+        if (null == cachedShimService)
             throw new IllegalStateException("Unable to load KryoShimService");
-        }
-
-        final Configuration userConf = configuration;
-
-        if (null != userConf) {
-            log.info("Configuring {} provider {} with user-provided configuration",
-                    KryoShimService.class.getSimpleName(), result);
-            result.applyConfiguration(userConf);
-        }
 
-        return cachedShimService = result;
-    }
-
-    /**
-     * Equivalent to {@link #load(boolean)} with the parameter {@code true}.
-     *
-     * @return the (possibly cached) shim service
-     */
-    public static KryoShimService load() {
-        return load(false);
+        // once the shim service is defined, configure it
+        log.info("Configuring {} provider {} with user-provided configuration", KryoShimService.class.getSimpleName(),
cachedShimService.getClass().getCanonicalName());
+        cachedShimService.applyConfiguration(configuration);
+        return cachedShimService;
     }
 
     /**
@@ -141,16 +123,13 @@ public class KryoShimServiceLoader {
      * where the {@code output} parameter is an internally-created {@link ByteArrayOutputStream}.
 Returns
      * the byte array underlying that stream.
      *
-     * @param o an object for which the instance and class are serialized
+     * @param object an object for which the instance and class are serialized
      * @return the serialized form
      */
-    public static byte[] writeClassAndObjectToBytes(final Object o) {
-        final KryoShimService shimService = load();
-
+    public static byte[] writeClassAndObjectToBytes(final Object object) {
+        final KryoShimService shimService = load(false);
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        shimService.writeClassAndObject(o, baos);
-
+        shimService.writeClassAndObject(object, baos);
         return baos.toByteArray();
     }
 
@@ -158,14 +137,13 @@ public class KryoShimServiceLoader {
      * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject},
      * where the {@code input} parameter is {@code source}.  Returns the deserialized object.
      *
-     * @param source an input stream containing data for a serialized object class and instance
-     * @param <T>    the type to which the deserialized object is cast as it is returned
+     * @param inputStream an input stream containing data for a serialized object class and
instance
+     * @param <T>         the type to which the deserialized object is cast as it is
returned
      * @return the deserialized object
      */
-    public static <T> T readClassAndObject(final InputStream source) {
-        final KryoShimService shimService = load();
-
-        return (T) shimService.readClassAndObject(source);
+    public static <T> T readClassAndObject(final InputStream inputStream) {
+        final KryoShimService shimService = load(false);
+        return (T) shimService.readClassAndObject(inputStream);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/982d2207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/SystemUtil.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/SystemUtil.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/SystemUtil.java
new file mode 100644
index 0000000..bc2c5ff
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/SystemUtil.java
@@ -0,0 +1,55 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.util;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SystemUtil {
+
+    private SystemUtil() {
+    }
+
+    /**
+     * Generate a {@link Configuration} from the {@link System#getProperties}.
+     * Only those properties with specified prefix key are aggregated.
+     * If the prefix and a . should be removed, then trim prefix.
+     *
+     * @param prefix     the prefix of the keys to include in the configuration
+     * @param trimPrefix whether to trim the prefix + . from the key
+     * @return a configuration generated from the System properties
+     */
+    public static Configuration getSystemPropertiesConfiguration(final String prefix, final
boolean trimPrefix) {
+        final BaseConfiguration apacheConfiguration = new BaseConfiguration();
+        apacheConfiguration.setDelimiterParsingDisabled(true);
+        for (final Map.Entry<Object, Object> entry : System.getProperties().entrySet())
{
+            final String key = entry.getKey().toString();
+            final Object value = entry.getValue();
+            if (key.startsWith(prefix + "."))
+                apacheConfiguration.setProperty(trimPrefix ? key.substring(prefix.length()
+ 1) : key, value);
+        }
+        return apacheConfiguration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/982d2207/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/SystemUtilTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/SystemUtilTest.java
b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/SystemUtilTest.java
new file mode 100644
index 0000000..e5dc705
--- /dev/null
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/SystemUtilTest.java
@@ -0,0 +1,89 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SystemUtilTest {
+
+    @Test
+    public void shouldLoadSystemProperties() {
+        System.setProperty("blah.aa", "1");
+        System.setProperty("blah.b", "true");
+        System.setProperty("blah.c", "three");
+        System.setProperty("bleep.d", "false");
+        Configuration configuration = SystemUtil.getSystemPropertiesConfiguration("blah",
false);
+        assertEquals(3, IteratorUtils.count(configuration.getKeys()));
+        assertEquals(1, configuration.getInt("blah.aa"));
+        assertTrue(configuration.getBoolean("blah.b"));
+        assertEquals("three", configuration.getProperty("blah.c"));
+        assertFalse(configuration.containsKey("d") || configuration.containsKey("bleep.d"));
+        System.clearProperty("blah.aa");
+        System.clearProperty("blah.b");
+        System.clearProperty("blah.c");
+        System.clearProperty("bleep.d");
+    }
+
+    @Test
+    public void shouldTrimSystemPropertyPrefixes() {
+        System.setProperty("blah.a", "1");
+        System.setProperty("blah.bbb", "true");
+        System.setProperty("blah.c", "three");
+        System.setProperty("bleep.d", "false");
+        Configuration configuration = SystemUtil.getSystemPropertiesConfiguration("blah",
true);
+        assertEquals(3, IteratorUtils.count(configuration.getKeys()));
+        assertEquals(1, configuration.getInt("a"));
+        assertTrue(configuration.getBoolean("bbb"));
+        assertEquals("three", configuration.getProperty("c"));
+        assertFalse(configuration.containsKey("d") || configuration.containsKey("bleep.d"));
+        System.clearProperty("blah.a");
+        System.clearProperty("blah.bbb");
+        System.clearProperty("blah.c");
+        System.clearProperty("bleep.d");
+    }
+
+    @Test
+    public void shouldTrimSystemPropertyPrefixesAndNoMore() {
+        System.setProperty("blah.a.x", "1");
+        System.setProperty("blah.b.y", "true");
+        System.setProperty("blah.cc.zzz", "three");
+        System.setProperty("bleep.d.d", "false");
+        Configuration configuration = SystemUtil.getSystemPropertiesConfiguration("blah",
true);
+        assertEquals(3, IteratorUtils.count(configuration.getKeys()));
+        assertEquals(1, configuration.getInt("a.x"));
+        assertTrue(configuration.getBoolean("b.y"));
+        assertEquals("three", configuration.getProperty("cc.zzz"));
+        assertFalse(configuration.containsKey("d") || configuration.containsKey("bleep.d"));
+        assertFalse(configuration.containsKey("d.d") || configuration.containsKey("bleep.d.d"));
+        System.clearProperty("blah.a.x");
+        System.clearProperty("blah.b.y");
+        System.clearProperty("blah.cc.zzz");
+        System.clearProperty("bleep.d.d");
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/982d2207/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index 25bc8b4..eb9a884 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -18,11 +18,12 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
-import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.gremlin.util.SystemUtil;
 
 import java.util.Collections;
 
@@ -31,8 +32,6 @@ import java.util.Collections;
  */
 public final class HadoopPools {
 
-    private static final Configuration EMPTY_CONFIGURATION = new BaseConfiguration();
-
     private HadoopPools() {
     }
 
@@ -61,17 +60,9 @@ public final class HadoopPools {
 
     public static GryoPool getGryoPool() {
         if (!INITIALIZED) {
-            /*if (null != System.getProperty("configuration", null)) {
-                try {
-                    HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() +
" has not been initialized, using the System properties configuration");
-                    initialize((Configuration) Serializer.deserializeObject(System.getProperty("configuration").getBytes()));
-                } catch (final Exception e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-            } else {*/
-            HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not
been initialized, using the default pool");
-            initialize(EMPTY_CONFIGURATION);
-            //}
+            final Configuration configuration = SystemUtil.getSystemPropertiesConfiguration("gremlin",
true);
+            HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not
been initialized, using system properties configuration: " + ConfigurationUtils.toString(configuration));
+            initialize(configuration);
         }
         return GRYO_POOL;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/982d2207/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index d345100..81ebaaa 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -70,7 +70,6 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -115,10 +114,19 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                 UnshadedKryoShimService.class.getCanonicalName() :
                 HadoopPoolShimService.class.getCanonicalName();
         this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService);
-        this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
-                (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
"") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
-        this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
-                (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
+        ///////////
+        final StringBuilder params = new StringBuilder();
+        this.sparkConfiguration.getKeys().forEachRemaining(key -> {
+            if (key.startsWith("gremlin") || key.startsWith("spark")) {
+                params.append(" -D").append("gremlin.").append(key).append("=").append(this.sparkConfiguration.getProperty(key));
+            }
+        });
+        if (params.length() > 0) {
+            this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
+                    (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
"") + params.toString()).trim());
+            this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+                    (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"") + params.toString()).trim());
+        }
         KryoShimServiceLoader.applyConfiguration(this.sparkConfiguration);
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/982d2207/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
index 0789d6a..2b0efda 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -44,56 +44,43 @@ import java.util.concurrent.LinkedBlockingQueue;
 public class UnshadedKryoShimService implements KryoShimService {
 
     private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class);
-
     private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>();
-
     private static volatile boolean initialized;
 
-    public UnshadedKryoShimService() {
-    }
-
     @Override
-    public Object readClassAndObject(final InputStream source) {
-
+    public Object readClassAndObject(final InputStream inputStream) {
         final LinkedBlockingQueue<Kryo> kryos = initialize();
-
         Kryo k = null;
         try {
             k = kryos.take();
-
-            return k.readClassAndObject(new Input(source));
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            return k.readClassAndObject(new Input(inputStream));
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e);
         } finally {
             try {
                 kryos.put(k);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+            } catch (final InterruptedException e) {
+                throw new IllegalStateException(e);
             }
         }
     }
 
     @Override
-    public void writeClassAndObject(final Object o, OutputStream sink) {
-
+    public void writeClassAndObject(final Object object, OutputStream outputStream) {
         final LinkedBlockingQueue<Kryo> kryos = initialize();
-
         Kryo k = null;
         try {
             k = kryos.take();
-
-            final Output kryoOutput = new Output(sink);
-
-            k.writeClassAndObject(kryoOutput, o);
-
+            final Output kryoOutput = new Output(outputStream);
+            k.writeClassAndObject(kryoOutput, object);
             kryoOutput.flush();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e);
         } finally {
             try {
                 kryos.put(k);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+            } catch (final InterruptedException e) {
+                throw new IllegalStateException(e);
             }
         }
     }
@@ -104,15 +91,15 @@ public class UnshadedKryoShimService implements KryoShimService {
     }
 
     @Override
-    public void applyConfiguration(final Configuration conf) {
-        initialize(conf);
+    public void applyConfiguration(final Configuration configuration) {
+        initialize(configuration);
     }
 
     private LinkedBlockingQueue<Kryo> initialize() {
         return initialize(new BaseConfiguration());
     }
 
-    private LinkedBlockingQueue<Kryo> initialize(final Configuration conf) {
+    private LinkedBlockingQueue<Kryo> initialize(final Configuration configuration)
{
         // DCL is safe in this case due to volatility
         if (!initialized) {
             synchronized (UnshadedKryoShimService.class) {
@@ -120,14 +107,13 @@ public class UnshadedKryoShimService implements KryoShimService {
                     final SparkConf sparkConf = new SparkConf();
 
                     // Copy the user's IoRegistry from the param conf to the SparkConf we
just created
-                    final String regStr = conf.getString(GryoPool.CONFIG_IO_REGISTRY);
-                    if (null != regStr) { // SparkConf rejects null values with NPE, so this
has to be checked before set(...)
+                    final String regStr = configuration.getString(GryoPool.CONFIG_IO_REGISTRY,
null);
+                    if (null != regStr)  // SparkConf rejects null values with NPE, so this
has to be checked before set(...)
                         sparkConf.set(GryoPool.CONFIG_IO_REGISTRY, regStr);
-                    }
+
                     // Setting spark.serializer here almost certainly isn't necessary, but
it doesn't hurt
                     sparkConf.set(Constants.SPARK_SERIALIZER, IoRegistryAwareKryoSerializer.class.getCanonicalName());
-
-                    final String registrator = conf.getString(Constants.SPARK_KRYO_REGISTRATOR);
+                    final String registrator = configuration.getString(Constants.SPARK_KRYO_REGISTRATOR);
                     if (null != registrator) {
                         sparkConf.set(Constants.SPARK_KRYO_REGISTRATOR, registrator);
                         log.info("Copied " + Constants.SPARK_KRYO_REGISTRATOR + ": {}", registrator);
@@ -140,7 +126,7 @@ public class UnshadedKryoShimService implements KryoShimService {
 
                     // Setup a pool backed by our spark.serializer instance
                     // Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf)
-                    final int poolSize = conf.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE,
+                    final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE,
                             GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
                     for (int i = 0; i < poolSize; i++) {
                         KRYOS.add(ioReg.newKryo());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/982d2207/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinGryoSerializerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinGryoSerializerIntegrateTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinGryoSerializerIntegrateTest.java
new file mode 100644
index 0000000..6523592
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinGryoSerializerIntegrateTest.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphGryoSerializerProvider;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(SparkGremlinSuite.class)
+@GraphProviderClass(provider = SparkHadoopGraphGryoSerializerProvider.class, graph = HadoopGraph.class)
+public class SparkGremlinGryoSerializerIntegrateTest {
+}


Mime
View raw message