tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [04/13] incubator-tinkerpop git commit: I have the SparkIntegrationTestSuite now testing either from Gryo FileInputFormat, GraphSON FileInputFormat, or an InputRDD. This gives us super coverage and proves that InputRDD (bypassing Hadoop) is working as ex
Date Thu, 03 Dec 2015 19:49:23 GMT
I have the SparkIntegrationTestSuite now testing either from Gryo FileInputFormat, GraphSON
FileInputFormat, or an InputRDD. This gives us super coverage and proves that InputRDD (bypassing
Hadoop) is working as expected. I also fixed up some other tests that used KryoSerializer
instead of GryoSerializer as I learned how to deal with Scalas WrappedArray class. It was
insane. This is really good stuff.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ddbbec08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ddbbec08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ddbbec08

Branch: refs/heads/master
Commit: ddbbec084635a7e764632713a07b0c532a614e4c
Parents: 85eb63d
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed Dec 2 12:02:09 2015 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Thu Dec 3 12:47:52 2015 -0700

----------------------------------------------------------------------
 .../structure/hdfs/HadoopElementIterator.java   |  6 ++-
 .../computer/payload/MessagePayload.java        |  5 +-
 .../computer/payload/ViewIncomingPayload.java   |  6 +--
 .../computer/payload/ViewOutgoingPayload.java   | 12 +++--
 .../spark/structure/io/gryo/GryoSerializer.java |  6 +++
 .../io/gryo/WrappedArraySerializer.java         | 46 ++++++++++++++++++
 .../computer/SparkHadoopGraphProvider.java      | 40 ++++++++++++++--
 .../spark/structure/io/ClassicInputRDD.java     | 39 +++++++++++++++
 .../spark/structure/io/GratefulInputRDD.java    | 50 ++++++++++++++++++++
 .../spark/structure/io/InputOutputRDDTest.java  |  3 +-
 .../spark/structure/io/InputRDDTest.java        |  5 +-
 .../spark/structure/io/ModernInputRDD.java      | 41 ++++++++++++++++
 .../spark/structure/io/OutputRDDTest.java       |  3 +-
 .../spark/structure/io/TheCrewInputRDD.java     | 39 +++++++++++++++
 14 files changed, 285 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
index f9ffea2..45f3c55 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
@@ -19,6 +19,8 @@
 package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -56,7 +58,9 @@ public abstract class HadoopElementIterator<E extends Element> implements
Iterat
             final InputFormat<NullWritable, VertexWritable> inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance();
             if (inputFormat instanceof FileInputFormat) {
                 if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
-                    return; // there is not input location and thus, no data (empty graph)
+                    return; // there is no input location and thus, no data (empty graph)
+                if (!FileSystem.get(configuration).exists(new Path(this.graph.configuration().getInputLocation())))
+                    return; // there is no data at the input location (empty graph)
                 configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, this.graph.configuration().getInputLocation());
             }
             final List<InputSplit> splits = inputFormat.getSplits(new JobContextImpl(configuration,
new JobID(UUID.randomUUID().toString(), 1)));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java
index 09e2599..f32ec44 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java
@@ -23,7 +23,10 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.payload;
  */
 public final class MessagePayload<M> implements Payload {
 
-    private final M message;
+    private M message;
+
+    private MessagePayload() {
+    }
 
     public MessagePayload(final M message) {
         this.message = message;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
index 911fc7b..a2c9205 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
@@ -31,11 +31,11 @@ import java.util.List;
 public final class ViewIncomingPayload<M> implements Payload {
 
     private List<DetachedVertexProperty<Object>> view = null;
-    private final List<M> incomingMessages;
+    private List<M> incomingMessages;
 
 
-    public ViewIncomingPayload() {
-        this.incomingMessages = null;
+    private ViewIncomingPayload() {
+
     }
 
     public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
index fc4aeed..20c8e09 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
@@ -28,10 +28,14 @@ import java.util.List;
  */
 public final class ViewOutgoingPayload<M> implements Payload {
 
-    private final List<DetachedVertexProperty<Object>> view;
-    private final List<Tuple2<Object,M>> outgoingMessages;
+    private List<DetachedVertexProperty<Object>> view;
+    private List<Tuple2<Object, M>> outgoingMessages;
 
-    public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view,
final List<Tuple2<Object,M>> outgoingMessages) {
+    private ViewOutgoingPayload() {
+
+    }
+
+    public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view,
final List<Tuple2<Object, M>> outgoingMessages) {
         this.view = view;
         this.outgoingMessages = outgoingMessages;
     }
@@ -40,7 +44,7 @@ public final class ViewOutgoingPayload<M> implements Payload {
         return new ViewPayload(this.view);
     }
 
-    public List<Tuple2<Object,M>> getOutgoingMessages() {
+    public List<Tuple2<Object, M>> getOutgoingMessages() {
         return this.outgoingMessages;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 7202892..29fde9f 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -41,6 +41,8 @@ import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer;
 import scala.Tuple2;
+import scala.Tuple3;
+import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
 import java.util.Collections;
@@ -79,11 +81,15 @@ public final class GryoSerializer extends Serializer {
                     try {
                         builder.addCustom(SerializableWritable.class, new JavaSerializer())
                                 .addCustom(Tuple2.class, new JavaSerializer())
+                                .addCustom(Tuple2[].class, new JavaSerializer())
+                                .addCustom(Tuple3.class, new JavaSerializer())
+                                .addCustom(Tuple3[].class, new JavaSerializer())
                                 .addCustom(CompressedMapStatus.class, new JavaSerializer())
                                 .addCustom(HttpBroadcast.class, new JavaSerializer())
                                 .addCustom(PythonBroadcast.class, new JavaSerializer())
                                 .addCustom(BoxedUnit.class, new JavaSerializer())
                                 .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"),
new JavaSerializer())
+                                .addCustom(WrappedArray.ofRef.class, new WrappedArraySerializer())
                                 .addCustom(MessagePayload.class)
                                 .addCustom(ViewIncomingPayload.class)
                                 .addCustom(ViewOutgoingPayload.class)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
new file mode 100644
index 0000000..0e9f03f
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.tinkerpop.shaded.kryo.Kryo;
+import org.apache.tinkerpop.shaded.kryo.Serializer;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.collection.JavaConversions;
+import scala.collection.mutable.WrappedArray;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WrappedArraySerializer<T> extends Serializer<WrappedArray<T>>
{
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final WrappedArray<T> iterable)
{
+        kryo.writeClassAndObject(output,new ArrayList<>(JavaConversions.asJavaList(iterable)));
+    }
+
+    @Override
+    public WrappedArray<T> read(final Kryo kryo, final Input input, final Class<WrappedArray<T>>
aClass) {
+        return new WrappedArray.ofRef<>((T[]) ((List<T>) kryo.readClassAndObject(input)).toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index c81ea92..2328176 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -20,12 +20,19 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
 
 import org.apache.tinkerpop.gremlin.GraphProvider;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.structure.io.ClassicInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.GratefulInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat;
+import org.apache.tinkerpop.gremlin.spark.structure.io.ModernInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.TheCrewInputRDD;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 
 import java.util.Map;
+import java.util.Random;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -33,13 +40,41 @@ import java.util.Map;
 @GraphProvider.Descriptor(computer = SparkGraphComputer.class)
 public final class SparkHadoopGraphProvider extends HadoopGraphProvider {
 
+    private static final Random RANDOM = new Random();
+
     @Override
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?>
test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
         final Map<String, Object> config = super.getBaseConfiguration(graphName, test,
testMethodName, loadGraphWith);
-        config.put("mapreduce.job.reduces", 4);
+        if (null != loadGraphWith) {
+            if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) {
+                if (RANDOM.nextBoolean()) {
+                    config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+                    config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ModernInputRDD.class.getCanonicalName());
+                    config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
+                }
+            } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) {
+                if (RANDOM.nextBoolean()) {
+                    config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+                    config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, TheCrewInputRDD.class.getCanonicalName());
+                    config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
+                }
+            } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) {
+                if (RANDOM.nextBoolean()) {
+                    config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+                    config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ClassicInputRDD.class.getCanonicalName());
+                    config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
+                }
+            } else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) {
+                if (RANDOM.nextBoolean()) {
+                    config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+                    config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, GratefulInputRDD.class.getCanonicalName());
+                    config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
+                }
+            }
+        }
         /// spark configuration
         config.put("spark.master", "local[4]");
-        // put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        //config.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
         config.put("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
         config.put("spark.kryo.registrationRequired", true);
         return config;
@@ -49,5 +84,4 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider
{
     public GraphTraversalSource traversal(final Graph graph) {
         return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java
new file mode 100644
index 0000000..4512b61
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ClassicInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
+        return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(),
VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java
new file mode 100644
index 0000000..396aa75
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GratefulInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
+        try {
+            final Graph graph = TinkerGraph.open();
+            graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile());
+            return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(graph.vertices(),
VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
index 3691aba..ea62114 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexPr
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.junit.Test;
@@ -41,7 +42,7 @@ public class InputOutputRDDTest {
     public void shouldReadFromWriteToArbitraryRDD() throws Exception {
         final Configuration configuration = new BaseConfiguration();
         configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
         configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
index 98a2b9f..2cbfd66 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
 import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.junit.Test;
@@ -42,7 +43,7 @@ public class InputRDDTest {
     public void shouldReadFromArbitraryRDD() {
         final Configuration configuration = new BaseConfiguration();
         configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
         configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
@@ -58,7 +59,7 @@ public class InputRDDTest {
     public void shouldSupportHadoopGraphOLTP() {
         final Configuration configuration = new BaseConfiguration();
         configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
         configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java
new file mode 100644
index 0000000..849e3e6
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ModernInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
+        return sparkContext.
+                parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(),
VertexWritable::new))).
+                mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
index 10eecb3..f9b6f39 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
@@ -30,6 +30,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSo
 import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.junit.Test;
@@ -43,7 +44,7 @@ public class OutputRDDTest {
     public void shouldWriteToArbitraryRDD() throws Exception {
         final Configuration configuration = new BaseConfiguration();
         configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
         configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ddbbec08/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java
new file mode 100644
index 0000000..ff5a274
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TheCrewInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
+        return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(),
VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
+    }
+}


Mime
View raw message