some organization and clean up. Stuff is lookin SOLID. Time to run full integration tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/26dcf3c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/26dcf3c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/26dcf3c9
Branch: refs/heads/TINKERPOP3-1014
Commit: 26dcf3c943bf1328747cd78d6fd5a814dfe6a5da
Parents: ddbbec0
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed Dec 2 13:11:29 2015 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Thu Dec 3 12:47:52 2015 -0700
----------------------------------------------------------------------
.../process/computer/SparkContextHelper.java | 39 +++++++++++
.../process/computer/SparkGraphComputer.java | 15 ++---
.../spark/structure/io/InputRDDFormat.java | 8 +--
.../process/computer/LocalPropertyTest.java | 4 +-
.../computer/SparkHadoopGraphProvider.java | 38 +++--------
.../spark/structure/io/ClassicInputRDD.java | 39 -----------
.../spark/structure/io/GratefulInputRDD.java | 50 --------------
.../spark/structure/io/InputOutputRDDTest.java | 1 -
.../spark/structure/io/InputRDDTest.java | 1 -
.../spark/structure/io/ModernInputRDD.java | 41 ------------
.../spark/structure/io/OutputRDDTest.java | 1 -
.../io/PersistedInputOutputRDDTest.java | 10 +--
.../spark/structure/io/TheCrewInputRDD.java | 39 -----------
.../spark/structure/io/ToyGraphInputRDD.java | 69 ++++++++++++++++++++
14 files changed, 134 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java
new file mode 100644
index 0000000..ed553c5
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkContextHelper {
+
+ private SparkContextHelper() {
+
+ }
+
+ public static void tryToCloseContext(final JavaSparkContext context, final Configuration
configuration) {
+ if (context != null && !configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
false))
+ context.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index e7566d5..c20f1b0 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -228,8 +228,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration,
this.resultGraph, this.persist), finalMemory.asImmutable());
} finally {
- if (sparkContext != null && !apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
false))
- sparkContext.stop();
+ SparkContextHelper.tryToCloseContext(sparkContext, apacheConfiguration);
}
}, exec);
}
@@ -268,14 +267,14 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
* Execution rather than applying the entire configuration.
*/
final String[] validPropertyNames = {
- "spark.job.description",
- "spark.jobGroup.id",
- "spark.job.interruptOnCancel",
- "spark.scheduler.pool"
+ "spark.job.description",
+ "spark.jobGroup.id",
+ "spark.job.interruptOnCancel",
+ "spark.scheduler.pool"
};
- for (String propertyName: validPropertyNames){
- if (sparkConfiguration.contains(propertyName)){
+ for (String propertyName : validPropertyNames) {
+ if (sparkConfiguration.contains(propertyName)) {
String propertyValue = sparkConfiguration.get(propertyName);
this.logger.info("Setting Thread Local SparkContext Property - "
+ propertyName + " : " + propertyValue);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
index 3952c66..d1a198e 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
@@ -31,6 +31,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkContextHelper;
import scala.Tuple2;
import java.io.IOException;
@@ -70,8 +71,8 @@ public final class InputRDDFormat extends InputFormat<NullWritable, VertexWritab
final SparkConf sparkConfiguration = new SparkConf();
sparkConfiguration.setAppName(UUID.randomUUID().toString());
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(),
entry.getValue()));
- InputRDD inputRDD = (InputRDD) Class.forName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD)).newInstance();
- JavaSparkContext javaSparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+ final InputRDD inputRDD = (InputRDD) Class.forName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD)).newInstance();
+ final JavaSparkContext javaSparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
final Iterator<Tuple2<Object, VertexWritable>> iterator = inputRDD.readGraphRDD(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()),
javaSparkContext).toLocalIterator();
return new RecordReader<NullWritable, VertexWritable>() {
@Override
@@ -101,8 +102,7 @@ public final class InputRDDFormat extends InputFormat<NullWritable,
VertexWritab
@Override
public void close() throws IOException {
- if (!hadoopConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
false))
- javaSparkContext.close();
+ SparkContextHelper.tryToCloseContext(javaSparkContext, ConfUtil.makeApacheConfiguration(hadoopConfiguration));
}
};
} catch (final ClassNotFoundException | InstantiationException | IllegalAccessException
e) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java
index e0fe796..671bee8 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java
@@ -25,7 +25,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkStatusTracker;
-import org.apache.spark.serializer.KryoSerializer;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
@@ -36,6 +35,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSo
import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;
@@ -53,7 +53,7 @@ public class LocalPropertyTest {
final String rddName = "target/test-output/" + UUID.randomUUID();
final Configuration configuration = new BaseConfiguration();
configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+ configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 2328176..2cfeea3 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -24,11 +24,9 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.structure.io.ClassicInputRDD;
-import org.apache.tinkerpop.gremlin.spark.structure.io.GratefulInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat;
-import org.apache.tinkerpop.gremlin.spark.structure.io.ModernInputRDD;
-import org.apache.tinkerpop.gremlin.spark.structure.io.TheCrewInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Graph;
import java.util.Map;
@@ -46,36 +44,16 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider
{
public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?>
test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
final Map<String, Object> config = super.getBaseConfiguration(graphName, test,
testMethodName, loadGraphWith);
if (null != loadGraphWith) {
- if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) {
- if (RANDOM.nextBoolean()) {
- config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
- config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ModernInputRDD.class.getCanonicalName());
- config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
- }
- } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) {
- if (RANDOM.nextBoolean()) {
- config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
- config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, TheCrewInputRDD.class.getCanonicalName());
- config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
- }
- } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) {
- if (RANDOM.nextBoolean()) {
- config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
- config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ClassicInputRDD.class.getCanonicalName());
- config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
- }
- } else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) {
- if (RANDOM.nextBoolean()) {
- config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
- config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, GratefulInputRDD.class.getCanonicalName());
- config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
- }
+ if (RANDOM.nextBoolean()) {
+ config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+ config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName());
+ config.put(ToyGraphInputRDD.GREMLIN_SPARK_TOY_GRAPH, loadGraphWith.toString());
+ config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
}
}
/// spark configuration
config.put("spark.master", "local[4]");
- //config.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- config.put("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
+ config.put("spark.serializer", GryoSerializer.class.getCanonicalName());
config.put("spark.kryo.registrationRequired", true);
return config;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java
deleted file mode 100644
index 4512b61..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.structure.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ClassicInputRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
- return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(),
VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java
deleted file mode 100644
index 396aa75..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.structure.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GratefulInputRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
- try {
- final Graph graph = TinkerGraph.open();
- graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile());
- return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(graph.vertices(),
VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
index ea62114..50a43bc 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.spark.serializer.KryoSerializer;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
index 2cbfd66..b64139c 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.spark.serializer.KryoSerializer;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java
deleted file mode 100644
index 849e3e6..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.structure.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ModernInputRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
- return sparkContext.
- parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(),
VertexWritable::new))).
- mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
index f9b6f39..60790e7 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.spark.serializer.KryoSerializer;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 6aeb864..1de2b47 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -24,7 +24,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.serializer.KryoSerializer;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
@@ -37,6 +36,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSo
import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.IoCore;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
@@ -58,7 +58,7 @@ public class PersistedInputOutputRDDTest {
final String rddName = "target/test-output/" + UUID.randomUUID();
final Configuration configuration = new BaseConfiguration();
configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+ configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
@@ -87,7 +87,7 @@ public class PersistedInputOutputRDDTest {
final String rddName = "target/test-output/" + UUID.randomUUID();
final Configuration configuration = new BaseConfiguration();
configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+ configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
@@ -129,7 +129,7 @@ public class PersistedInputOutputRDDTest {
final String rddName = "target/test-output/" + UUID.randomUUID().toString();
final Configuration readConfiguration = new BaseConfiguration();
readConfiguration.setProperty("spark.master", "local[4]");
- readConfiguration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+ readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
@@ -176,7 +176,7 @@ public class PersistedInputOutputRDDTest {
final String rddName = "target/test-output/" + UUID.randomUUID().toString();
final Configuration readConfiguration = new BaseConfiguration();
readConfiguration.setProperty("spark.master", "local[4]");
- readConfiguration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName());
+ readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java
deleted file mode 100644
index ff5a274..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.structure.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class TheCrewInputRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
- return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(),
VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/26dcf3c9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
new file mode 100644
index 0000000..ed97c04
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ToyGraphInputRDD implements InputRDD {
+
+ public static final String GREMLIN_SPARK_TOY_GRAPH = "gremlin.spark.toyGraph";
+
+ @Override
+ public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration,
final JavaSparkContext sparkContext) {
+ final List<Vertex> vertices;
+ if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.MODERN.toString()))
+ vertices = IteratorUtils.list(TinkerFactory.createModern().vertices());
+ else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.CLASSIC.toString()))
+ vertices = IteratorUtils.list(TinkerFactory.createClassic().vertices());
+ else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.CREW.toString()))
+ vertices = IteratorUtils.list(TinkerFactory.createTheCrew().vertices());
+ else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.GRATEFUL.toString()))
{
+ try {
+ final Graph graph = TinkerGraph.open();
+ graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile());
+ vertices = IteratorUtils.list(graph.vertices());
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ } else
+ throw new IllegalArgumentException("No legal toy graph was provided to load:
" + configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH));
+
+ return sparkContext.parallelize(vertices.stream().map(VertexWritable::new).collect(Collectors.toList())).mapToPair(vertex
-> new Tuple2<>(vertex.get().id(), vertex));
+ }
+}
|