tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject incubator-tinkerpop git commit: various clean ups and organizations. This static Spark class was the way to go. Things now 'just work'. Finally.
Date Tue, 08 Dec 2015 00:37:00 GMT
Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1027 8d7b8533c -> 5a3c5fbc6


various clean ups and organizations. This static Spark class was the way to go. Things now
'just work'. Finally.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/5a3c5fbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/5a3c5fbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/5a3c5fbc

Branch: refs/heads/TINKERPOP-1027
Commit: 5a3c5fbc67e0e77d54e95cee456579e9438ef92b
Parents: 8d7b853
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Mon Dec 7 17:36:55 2015 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Mon Dec 7 17:36:55 2015 -0700

----------------------------------------------------------------------
 .../spark/process/computer/SparkGraphComputer.java  |  3 ++-
 .../tinkerpop/gremlin/spark/structure/Spark.java    | 16 ++++++++--------
 .../gremlin/spark/structure/io/InputRDDFormat.java  |  2 +-
 .../structure/io/PersistedInputOutputRDDTest.java   | 12 ++++++++----
 4 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5a3c5fbc/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 079b0d6..2b85036 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -235,7 +235,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                 return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration,
this.resultGraph, this.persist), finalMemory.asImmutable());
             } finally {
-                Spark.tryAndClose(apacheConfiguration);
+                if (!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
false))
+                    Spark.close();
             }
         }, exec);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5a3c5fbc/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
index 62b0ab3..cc1d2bf 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
@@ -23,7 +23,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.rdd.RDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import scala.collection.JavaConversions;
 
 import java.util.ArrayList;
@@ -48,7 +47,14 @@ public class Spark {
     public static void create(final Configuration configuration) {
         final SparkConf sparkConf = new SparkConf();
         configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, configuration.getProperty(key).toString()));
-        sparkConf.setAppName("Spark RDD Utility");
+        sparkConf.setAppName("Spark-Gremlin Persisted Context Application");
+        CONTEXT = SparkContext.getOrCreate(sparkConf);
+    }
+
+    public static void create(final String master) {
+        final SparkConf sparkConf = new SparkConf();
+        sparkConf.setAppName("Spark-Gremlin Persisted Context Application");
+        sparkConf.setMaster(master);
         CONTEXT = SparkContext.getOrCreate(sparkConf);
     }
 
@@ -103,10 +109,4 @@ public class Spark {
             CONTEXT.stop();
         NAME_TO_RDD.clear();
     }
-
-    public static void tryAndClose(final Configuration configuration) {
-        if (CONTEXT != null && !configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
false)) {
-            Spark.close();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5a3c5fbc/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
index 9182b4c..d00af55 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
@@ -103,7 +103,7 @@ public final class InputRDDFormat extends InputFormat<NullWritable,
VertexWritab
 
                 @Override
                 public void close() throws IOException {
-                    Spark.tryAndClose(ConfUtil.makeApacheConfiguration(hadoopConfiguration));
+
                 }
             };
         } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException
e) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5a3c5fbc/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index a85200a..9ad50a9 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -56,6 +56,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
 
     @Test
     public void shouldNotPersistRDDAcrossJobs() throws Exception {
+        Spark.create("local[4]");
         final String rddName = "target/test-output/" + UUID.randomUUID();
         final Configuration configuration = new BaseConfiguration();
         configuration.setProperty("spark.master", "local[4]");
@@ -76,7 +77,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
                                 "gremlin-groovy",
                                 "g.V()").create(graph)).submit().get();
         ////////
-        Spark.create(configuration);
+        Spark.create("local[4]");
         assertFalse(Spark.hasRDD(rddName));
         Spark.close();
     }
@@ -103,7 +104,6 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
                                 "gremlin-groovy",
                                 "g.V()").create(graph)).submit().get();
         ////////
-        Spark.create(configuration);
         assertTrue(Spark.hasRDD(rddName));
         ///////
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
@@ -123,6 +123,8 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
 
     @Test
     public void testBulkLoaderVertexProgramChain() throws Exception {
+        Spark.create("local[4]");
+
         final String rddName = "target/test-output/" + UUID.randomUUID().toString();
         final Configuration readConfiguration = new BaseConfiguration();
         readConfiguration.setProperty("spark.master", "local[4]");
@@ -151,7 +153,6 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
                 .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
                 .submit().get();
         ////
-        Spark.create(readConfiguration);
         assertTrue(Spark.hasRDD(rddName));
         ////
         final Graph graph = TinkerGraph.open();
@@ -168,6 +169,8 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
 
     @Test
     public void testBulkLoaderVertexProgramChainWithInputOutputHelperMapping() throws Exception
{
+        Spark.create("local[4]");
+
         final String rddName = "target/test-output/" + UUID.randomUUID().toString();
         final Configuration readConfiguration = new BaseConfiguration();
         readConfiguration.setProperty("spark.master", "local[4]");
@@ -209,6 +212,8 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
 
     @Test
     public void testComplexChain() throws Exception {
+        Spark.create("local[4]");
+
         final String rddName = "target/test-output/graphRDD";
         final Configuration configuration = new BaseConfiguration();
         configuration.setProperty("spark.master", "local[4]");
@@ -220,7 +225,6 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
         configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Spark.create(configuration);
         Graph graph = GraphFactory.open(configuration);
         graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
         GraphTraversalSource g = graph.traversal();


Mime
View raw message