tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [02/17] incubator-tinkerpop git commit: added Storage to gremlin-core. Storage is an interface that OLAP system can implement. It provides ls(), rmr(), rm(), etc. type methods that make it easy for users to interact (via a common interface) with the unde
Date Fri, 08 Jan 2016 14:20:27 GMT
added Storage to gremlin-core. Storage is an interface that OLAP system can implement. It provides ls(), rmr(), rm(), etc. type methods that make it easy for users to interact (via a common interface) with the underlying persitance system. Now both HDFS and Spark provide their own Storage implementations and TADA. Really pretty.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/58d92407
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/58d92407
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/58d92407

Branch: refs/heads/master
Commit: 58d9240764cd6e1f3779097966c53058264e00e6
Parents: f3ebed0
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed Dec 9 13:46:43 2015 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed Dec 9 13:46:43 2015 -0700

----------------------------------------------------------------------
 .../peerpressure/ClusterCountMapReduce.java     |   7 +-
 .../tinkerpop/gremlin/structure/io/Storage.java |  82 ++++++++
 .../gremlin/structure/util/StringFactory.java   |  18 +-
 .../hadoop/groovy/plugin/HadoopLoader.groovy    | 138 --------------
 .../groovy/plugin/HadoopGremlinPlugin.java      |   7 +-
 .../hadoop/structure/io/FileSystemStorage.java  | 186 +++++++++++++++++++
 .../groovy/plugin/GraphMemoryHDFSCheck.java     |  64 +++++++
 .../hadoop/groovy/plugin/HadoopPluginSuite.java |   2 +-
 .../spark/groovy/plugin/SparkLoader.groovy      |  68 -------
 .../spark/groovy/plugin/SparkGremlinPlugin.java |   5 +-
 .../spark/structure/io/PersistedInputRDD.java   |   8 +-
 .../spark/structure/io/SparkContextStorage.java | 124 +++++++++++++
 .../gremlin/spark/AbstractSparkTest.java        |  30 +++
 .../groovy/plugin/SparkGremlinPluginTest.java   |  10 +-
 .../structure/io/GraphMemorySparkTest.java      |  75 ++++++++
 15 files changed, 593 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
index 1112a46..d343e8e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
@@ -64,7 +64,7 @@ public class ClusterCountMapReduce extends StaticMapReduce<MapReduce.NullObject,
 
     @Override
     public boolean doStage(final Stage stage) {
-        return true;
+        return !stage.equals(Stage.COMBINE);
     }
 
     @Override
@@ -76,11 +76,6 @@ public class ClusterCountMapReduce extends StaticMapReduce<MapReduce.NullObject,
     }
 
     @Override
-    public void combine(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
-        this.reduce(key, values, emitter);
-    }
-
-    @Override
     public void reduce(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
         final Set<Serializable> set = new HashSet<>();
         values.forEachRemaining(set::add);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
new file mode 100644
index 0000000..1f1bcf4
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.structure.io;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Storage {
+
+    public List<String> ls();
+
+    public List<String> ls(final String location);
+
+    public boolean mkdir(final String location);
+
+    public boolean cp(final String fromLocation, final String toLocation);
+
+    public boolean exists(final String location);
+
+    public boolean rm(final String location);
+
+    public boolean rmr(final String location);
+
+    public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass);
+
+    public default Iterator<Object> head(final String location) {
+        return this.head(location, Object.class);
+    }
+
+    public default Iterator<Object> head(final String location, final int totalLines) {
+        return this.head(location, totalLines, Object.class);
+    }
+
+    public default <V> Iterator<V> head(final String location, final Class<V> objectClass) {
+        return this.head(location, Integer.MAX_VALUE, objectClass);
+    }
+
+  /*
+
+        FileSystem.metaClass.copyToLocal = { final String from, final String to ->
+            return ((FileSystem) delegate).copyToLocalFile(new Path(from), new Path(to));
+        }
+
+        FileSystem.metaClass.copyFromLocal = { final String from, final String to ->
+            return ((FileSystem) delegate).copyFromLocalFile(new Path(from), new Path(to));
+        }
+
+        FileSystem.metaClass.mergeToLocal = { final String from, final String to ->
+            final FileSystem fs = (FileSystem) delegate;
+            final FileSystem local = FileSystem.getLocal(new Configuration());
+            final FSDataOutputStream outA = local.create(new Path(to));
+
+            HDFSTools.getAllFilePaths(fs, new Path(from), HiddenFileFilter.instance()).each {
+                final FSDataInputStream inA = fs.open(it);
+                IOUtils.copyBytes(inA, outA, 8192);
+                inA.close();
+            }
+            outA.close();
+        }
+
+     */
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
index e716a60..9ae8116 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
@@ -18,18 +18,18 @@
  */
 package org.apache.tinkerpop.gremlin.structure.util;
 
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalRing;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -75,6 +75,7 @@ public final class StringFactory {
     private static final String EMPTY_PROPERTY = "p[empty]";
     private static final String EMPTY_VERTEX_PROPERTY = "vp[empty]";
     private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+    private static final String STORAGE = "storage";
 
     private static final String featuresStartWith = "supports";
     private static final int prefixLength = featuresStartWith.length();
@@ -237,4 +238,9 @@ public final class StringFactory {
     public static String traversalString(final Traversal.Admin<?, ?> traversal) {
         return traversal.getSteps().toString();
     }
+
+    public static String storageString(final String internalString) {
+        return STORAGE + L_BRACKET + internalString + R_BRACKET;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy b/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
deleted file mode 100644
index 616c2f0..0000000
--- a/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.*
-import org.apache.hadoop.io.IOUtils
-import org.apache.hadoop.io.Text
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.TextIterator
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritableIterator
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-class HadoopLoader {
-
-    private static final String SPACE = " ";
-    private static final String D_SPACE = "(D) ";
-
-    public static void load() {
-
-        FileStatus.metaClass.toString = {
-            StringBuilder s = new StringBuilder();
-            s.append(((FileStatus) delegate).getPermission()).append(SPACE)
-            s.append(((FileStatus) delegate).getOwner()).append(SPACE);
-            s.append(((FileStatus) delegate).getGroup()).append(SPACE);
-            s.append(((FileStatus) delegate).getLen()).append(SPACE);
-            if (((FileStatus) delegate).isDir())
-                s.append(D_SPACE);
-            s.append(((FileStatus) delegate).getPath().getName());
-            return s.toString();
-        }
-
-        FileSystem.metaClass.ls = { String path ->
-            if (null == path || path.equals("/")) path = ((FileSystem) delegate).getHomeDirectory().toString();
-            return ((FileSystem) delegate).globStatus(new Path(path + "/*")).collect {
-                it.toString()
-            };
-        }
-
-        FileSystem.metaClass.mkdir = { String path ->
-            ((FileSystem) delegate).mkdirs(new Path(path));
-        }
-
-        FileSystem.metaClass.cp = { final String from, final String to ->
-            return FileUtil.copy(((FileSystem) delegate), new Path(from), ((FileSystem) delegate), new Path(to), false, new Configuration());
-        }
-
-        FileSystem.metaClass.exists = { final String path ->
-            return ((FileSystem) delegate).exists(new Path(path));
-        }
-
-        FileSystem.metaClass.rm = { final String path ->
-            HDFSTools.globDelete((FileSystem) delegate, path, false);
-        }
-
-        FileSystem.metaClass.rmr = { final String path ->
-            HDFSTools.globDelete((FileSystem) delegate, path, true);
-        }
-
-        FileSystem.metaClass.copyToLocal = { final String from, final String to ->
-            return ((FileSystem) delegate).copyToLocalFile(new Path(from), new Path(to));
-        }
-
-        FileSystem.metaClass.copyFromLocal = { final String from, final String to ->
-            return ((FileSystem) delegate).copyFromLocalFile(new Path(from), new Path(to));
-        }
-
-        FileSystem.metaClass.mergeToLocal = { final String from, final String to ->
-            final FileSystem fs = (FileSystem) delegate;
-            final FileSystem local = FileSystem.getLocal(new Configuration());
-            final FSDataOutputStream outA = local.create(new Path(to));
-
-            HDFSTools.getAllFilePaths(fs, new Path(from), HiddenFileFilter.instance()).each {
-                final FSDataInputStream inA = fs.open(it);
-                IOUtils.copyBytes(inA, outA, 8192);
-                inA.close();
-            }
-            outA.close();
-        }
-
-        FileSystem.metaClass.head = { final String path, final int totalLines ->
-            return headMaker((FileSystem) delegate, path, totalLines, Text.class);
-        }
-
-        FileSystem.metaClass.head = { final String path ->
-            return headMaker((FileSystem) delegate, path, Integer.MAX_VALUE, Text.class);
-        }
-
-        FileSystem.metaClass.head = {
-            final String path, final Class<org.apache.hadoop.io.Writable> writableClass ->
-                return headMaker((FileSystem) delegate, path, Integer.MAX_VALUE, writableClass);
-        }
-
-        FileSystem.metaClass.head = {
-            final String path, final int totalLines, final Class<org.apache.hadoop.io.Writable> writableClass ->
-                return headMaker((FileSystem) delegate, path, totalLines, writableClass);
-        }
-
-        /*FileSystem.metaClass.unzip = { final String from, final String to, final boolean deleteZip ->
-            HDFSTools.decompressPath((FileSystem) delegate, from, to, Tokens.BZ2, deleteZip);
-        }*/
-
-    }
-
-    private static Iterator headMaker(
-            final FileSystem fs,
-            final String path, final int totalLines, final Class<org.apache.hadoop.io.Writable> writableClass) {
-        if (writableClass.equals(ObjectWritable.class))
-            return IteratorUtils.limit(new ObjectWritableIterator(fs.getConf(), new Path(path)), totalLines);
-        else if (writableClass.equals(VertexWritable.class))
-            return IteratorUtils.limit(new VertexWritableIterator(fs.getConf(), new Path(path)), totalLines);
-        else
-            return IteratorUtils.limit(new TextIterator(fs.getConf(), new Path(path)), totalLines);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
index d62b4e5..b4f5cd2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.mapreduce.MapReduceG
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
@@ -86,10 +87,10 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", MapReduceGraphComputer.class.getName()));
             ///
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", HadoopGraph.class.getName()));
-            pluginAcceptor.eval(HadoopLoader.class.getCanonicalName() + ".load()");
+            //pluginAcceptor.eval(HadoopLoader.class.getCanonicalName() + ".load()");
 
-            pluginAcceptor.addBinding("hdfs", FileSystem.get(new Configuration()));
-            pluginAcceptor.addBinding("local", FileSystem.getLocal(new Configuration()));
+            pluginAcceptor.addBinding("hdfs", new FileSystemStorage(FileSystem.get(new Configuration())));
+            pluginAcceptor.addBinding("local", new FileSystemStorage(FileSystem.getLocal(new Configuration())));
             if (null == System.getenv(Constants.HADOOP_GREMLIN_LIBS))
                 HadoopGraph.LOGGER.warn("Be sure to set the environmental variable: " + Constants.HADOOP_GREMLIN_LIBS);
             else

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
new file mode 100644
index 0000000..56dfe52
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
+import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.TextIterator;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class FileSystemStorage implements Storage {
+
+    private static final String SPACE = " ";
+    private static final String D_SPACE = "(D) ";
+
+    private final FileSystem fs;
+
+    public FileSystemStorage(final FileSystem fileSystem) {
+        this.fs = fileSystem;
+    }
+
+    private static String fileStatusString(final FileStatus status) {
+        StringBuilder s = new StringBuilder();
+        s.append(status.getPermission()).append(" ");
+        s.append(status.getOwner()).append(SPACE);
+        s.append(status.getGroup()).append(SPACE);
+        s.append(status.getLen()).append(SPACE);
+        if (status.isDir())
+            s.append(D_SPACE);
+        s.append(status.getPath().getName());
+        return s.toString();
+    }
+
+    @Override
+    public List<String> ls() {
+        return this.ls("/");
+    }
+
+    @Override
+    public List<String> ls(final String location) {
+        try {
+            final String newLocation;
+            newLocation = location.equals("/") ? this.fs.getHomeDirectory().toString() : location;
+            return Stream.of(this.fs.globStatus(new Path(newLocation + "/*"))).map(FileSystemStorage::fileStatusString).collect(Collectors.toList());
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean mkdir(final String location) {
+        try {
+            return this.fs.mkdirs(new Path(location));
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean cp(final String fromLocation, final String toLocation) {
+        try {
+            return FileUtil.copy(this.fs, new Path(fromLocation), this.fs, new Path(toLocation), false, new Configuration());
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean exists(final String location) {
+        try {
+            return this.fs.exists(new Path(location));
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean rm(final String location) {
+        try {
+            return HDFSTools.globDelete(this.fs, location, false);
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean rmr(final String location) {
+        try {
+            return HDFSTools.globDelete(this.fs, location, true);
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass) {
+        return headMaker(this.fs, location, totalLines, (Class<? extends Writable>) objectClass);
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.storageString(this.fs.toString());
+    }
+
+    private static Iterator headMaker(final FileSystem fs, final String path, final int totalLines, final Class<? extends Writable> writableClass) {
+        try {
+            if (writableClass.equals(ObjectWritable.class))
+                return IteratorUtils.limit(new ObjectWritableIterator(fs.getConf(), new Path(path)), totalLines);
+            else if (writableClass.equals(VertexWritable.class))
+                return IteratorUtils.limit(new VertexWritableIterator(fs.getConf(), new Path(path)), totalLines);
+            else
+                return IteratorUtils.limit(new TextIterator(fs.getConf(), new Path(path)), totalLines);
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    /////////
+
+    public void copyToLocal(final String fromLocation, final String toLocation) {
+        try {
+            this.fs.copyToLocalFile(new Path(fromLocation), new Path(toLocation));
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public void copyFromLocal(final String fromLocation, final String toLocation) {
+        try {
+            this.fs.copyFromLocalFile(new Path(fromLocation), new Path(toLocation));
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public void mergeToLocal(final String fromLocation, final String toLocation) {
+        try {
+            final FileSystem local = FileSystem.getLocal(new Configuration());
+            final FSDataOutputStream outA = local.create(new Path(toLocation));
+            for (final Path path : HDFSTools.getAllFilePaths(fs, new Path(fromLocation), HiddenFileFilter.instance())) {
+                final FSDataInputStream inA = fs.open(path);
+                IOUtils.copyBytes(inA, outA, 8192);
+                inA.close();
+            }
+            outA.close();
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
new file mode 100644
index 0000000..d47ce43
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GraphMemoryHDFSCheck extends AbstractGremlinTest {
+
+    @Test
+    @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
+    public void shouldPersistGraphAndMemory() throws Exception {
+        final ComputerResult result = graph.compute(graphComputerClass.get()).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
+        /////
+        final Storage storage = new FileSystemStorage(FileSystem.get(ConfUtil.makeHadoopConfiguration(graph.configuration())));
+        // TEST GRAPH PERSISTENCE
+        assertTrue(storage.exists(Constants.getGraphLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))));
+        assertEquals(6, result.graph().traversal().V().count().next().longValue());
+        assertEquals(0, result.graph().traversal().E().count().next().longValue());
+        assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
+        assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
+        assertEquals(2, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).dedup().count().next().longValue());
+        /////
+        // TEST MEMORY PERSISTENCE
+        assertEquals(2, (int) result.memory().get("clusterCount"));
+        assertTrue(storage.exists(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")));
+ //       System.out.println(IteratorUtils.list(storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount"))));
+//        assertEquals(1, IteratorUtils.count(storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount"))));
+        assertEquals(2, storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")).next());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
index 6c4cc20..7dc8143 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
@@ -29,6 +29,6 @@ import org.junit.runners.model.RunnerBuilder;
  */
 public class HadoopPluginSuite extends AbstractGremlinSuite {
     public HadoopPluginSuite(final Class<?> klass, final RunnerBuilder builder) throws InitializationError {
-        super(klass, builder, new Class<?>[]{HadoopGremlinPluginCheck.class}, new Class<?>[]{HadoopGremlinPluginCheck.class}, true, TraversalEngine.Type.COMPUTER);
+        super(klass, builder, new Class<?>[]{HadoopGremlinPluginCheck.class, GraphMemoryHDFSCheck.class}, new Class<?>[]{HadoopGremlinPluginCheck.class, GraphMemoryHDFSCheck.class}, true, TraversalEngine.Type.COMPUTER);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkLoader.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkLoader.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkLoader.groovy
deleted file mode 100644
index 53d385f..0000000
--- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkLoader.groovy
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.groovy.plugin
-
-import org.apache.spark.rdd.RDD
-import org.apache.tinkerpop.gremlin.spark.structure.Spark
-import scala.collection.JavaConversions
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-class SparkLoader {
-
-    public static void load() {
-
-        Spark.metaClass.static.ls = {
-            final List<String> rdds = new ArrayList<>();
-            for (final RDD<?> rdd : Spark.getRDDs()) {
-                rdds.add(rdd.name() + " [" + rdd.getStorageLevel().description() + "]")
-            }
-            return rdds;
-        }
-
-        Spark.metaClass.static.rm = { final String rddName ->
-            for (final RDD<?> rdd : Spark.getRDDs()) {
-                if (rdd.name().matches(rddName.replace(".", "\\.").replace("*", ".*")))
-                    Spark.removeRDD(rdd.name());
-            }
-        }
-
-        Spark.metaClass.static.head = { final String rddName ->
-            return Spark.head(rddName, Integer.MAX_VALUE);
-        }
-
-        Spark.metaClass.static.head = { final String rddName, final int totalLines ->
-            final List<Object> data = new ArrayList<>();
-            final Iterator<?> itty = JavaConversions.asJavaIterator(Spark.getRDD(rddName).toLocalIterator());
-            for (int i = 0; i < totalLines; i++) {
-                if (itty.hasNext())
-                    data.add(itty.next());
-                else
-                    break;
-            }
-            return data;
-        }
-
-        Spark.metaClass.static.describe = { final String rddName ->
-            return Spark.getRDD(rddName).toDebugString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
index 9351a1e..7711435 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.groovy.plugin.PluginInitializationException;
 import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteAcceptor;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
 
 import java.util.HashSet;
 import java.util.Optional;
@@ -43,6 +44,7 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
         add("import org.apache.log4j.*");
         add(IMPORT_SPACE + SparkGraphComputer.class.getPackage().getName() + DOT_STAR);
         add(IMPORT_SPACE + Spark.class.getPackage().getName() + DOT_STAR);
+        add(IMPORT_SPACE + SparkContextStorage.class.getPackage().getName() + DOT_STAR);
     }};
 
     @Override
@@ -56,8 +58,7 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
         try {
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", SparkGraphComputer.class.getName()));
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.ERROR)", MetricsSystem.class.getName()));
-            pluginAcceptor.eval("spark = Spark");
-            pluginAcceptor.eval(SparkLoader.class.getCanonicalName() + ".load()");
+            pluginAcceptor.eval("spark = SparkContextStorage.open()");
         } catch (final Exception e) {
             throw new PluginInitializationException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
index 52d18f1..55bf53b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
@@ -36,8 +36,9 @@ public final class PersistedInputRDD implements InputRDD {
     public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
         if (!configuration.containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
             throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " to read the persisted RDD from");
-        final String graphRDDName = Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
         Spark.create(sparkContext.sc());
+        final String inputLocation = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+        final String graphRDDName = Spark.hasRDD(inputLocation) ? inputLocation : Constants.getGraphLocation(inputLocation);
         return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(graphRDDName).toJavaRDD());
     }
 
@@ -45,8 +46,9 @@ public final class PersistedInputRDD implements InputRDD {
     public <K, V> JavaPairRDD<K, V> readMemoryRDD(final Configuration configuration, final String memoryKey, final JavaSparkContext sparkContext) {
         if (!configuration.containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
             throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " to read the persisted RDD from");
-        final String sideEffectRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), memoryKey);
+        final String inputLocation = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+        final String memoryRDDName = Spark.hasRDD(inputLocation) ? inputLocation : Constants.getMemoryLocation(inputLocation, memoryKey);
         Spark.create(sparkContext.sc());
-        return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(sideEffectRDDName).toJavaRDD());
+        return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(memoryRDDName).toJavaRDD());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
new file mode 100644
index 0000000..2db267f
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.collection.JavaConversions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkContextStorage implements Storage {
+
+    private SparkContextStorage() {
+
+    }
+
+    public static SparkContextStorage open() {
+        return new SparkContextStorage();
+    }
+
+    public static SparkContextStorage open(final String master) {
+        Spark.create(master);
+        return new SparkContextStorage();
+    }
+
+    public static SparkContextStorage open(final Configuration configuration) {
+        Spark.create(configuration);
+        return new SparkContextStorage();
+    }
+
+    public static SparkContextStorage open(final SparkContext sparkContext) {
+        Spark.create(sparkContext);
+        return new SparkContextStorage();
+    }
+
+
+    @Override
+    public List<String> ls() {
+        return ls("*");
+    }
+
+    @Override
+    public List<String> ls(final String location) {
+        final List<String> rdds = new ArrayList<>();
+        final String wildCardLocation = location.replace(".", "\\.").replace("*", ".*");
+        for (final RDD<?> rdd : Spark.getRDDs()) {
+            if (rdd.name().matches(wildCardLocation))
+                rdds.add(rdd.name() + " [" + rdd.getStorageLevel().description() + "]");
+        }
+        return rdds;
+    }
+
+    @Override
+    public boolean mkdir(final String location) {
+        throw new UnsupportedOperationException("This operation does not make sense for a persited SparkContext");
+    }
+
+    @Override
+    public boolean cp(final String fromLocation, final String toLocation) {
+        Spark.getRDD(fromLocation).setName(toLocation).cache();
+        Spark.removeRDD(fromLocation);
+        return true;
+    }
+
+    @Override
+    public boolean exists(final String location) {
+        return Spark.hasRDD(location);
+    }
+
+    @Override
+    public boolean rm(final String location) {
+        if (!Spark.hasRDD(location))
+            return false;
+        Spark.removeRDD(location);
+        return true;
+    }
+
+    @Override
+    public boolean rmr(final String location) {
+        final List<String> rdds = new ArrayList<>();
+        final String wildCardLocation = location.replace(".", "\\.").replace("*", ".*");
+        for (final RDD<?> rdd : Spark.getRDDs()) {
+            if (rdd.name().matches(wildCardLocation))
+                rdds.add(rdd.name());
+        }
+        rdds.forEach(Spark::removeRDD);
+        return rdds.size() > 0;
+    }
+
+    @Override
+    public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass) {
+        return IteratorUtils.limit((Iterator) JavaConversions.asJavaIterator(Spark.getRDD(location).toLocalIterator()), totalLines);
+    }
+
+    public String describe(final String location) {
+        return Spark.getRDD(location).toDebugString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
index ccff1ab..3fc2a59 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
@@ -19,10 +19,21 @@
 
 package org.apache.tinkerpop.gremlin.spark;
 
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.junit.After;
 import org.junit.Before;
 
@@ -43,4 +54,23 @@ public abstract class AbstractSparkTest {
         Spark.close();
         System.out.println("SparkContext has been closed for " + this.getClass().getCanonicalName() + "-setupTest");
     }
+
+    protected Configuration getBaseConfiguration(final String inputLocation) {
+        final BaseConfiguration configuration = new BaseConfiguration();
+        configuration.setDelimiterParsingDisabled(true);
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        if (inputLocation.contains(".kryo"))
+            configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        else if (inputLocation.contains(".json"))
+            configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GraphSONInputFormat.class.getCanonicalName());
+        else
+            configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        return configuration;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java
index 0b60825..7574908 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java
@@ -34,9 +34,11 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Iterator;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -97,9 +99,9 @@ public class SparkGremlinPluginTest extends AbstractSparkTest {
         assertEquals(1, ((List<String>) this.console.eval("spark.ls()")).size());
         assertTrue(((List<String>) this.console.eval("spark.ls()")).contains("target/test-output/graph-1/~g [Memory Deserialized 1x Replicated]"));
 
-        assertEquals(6, ((List<Object>) this.console.eval("spark.head('target/test-output/graph-1/~g')")).size());
+        assertEquals(6, IteratorUtils.count(((Iterator<Object>) this.console.eval("spark.head('target/test-output/graph-1/~g')"))));
 
-        this.console.eval("spark.rm('target/test-output/graph-*')");
+        this.console.eval("spark.rmr('target/test-output/graph-*')");
         assertEquals(0, ((List<String>) this.console.eval("spark.ls()")).size());
 
         //////
@@ -116,9 +118,9 @@ public class SparkGremlinPluginTest extends AbstractSparkTest {
         this.console.eval("graph.compute(SparkGraphComputer).program(PageRankVertexProgram.build().iterations(1).create()).submit().get()");
 
         assertEquals(3, ((List<String>) this.console.eval("spark.ls()")).size());
-        this.console.eval("spark.rm('target/test-output/graph-*')");
+        this.console.eval("spark.rmr('target/test-output/graph-*')");
         assertEquals(1, ((List<String>) this.console.eval("spark.ls()")).size());
-        this.console.eval("spark.rm('*')");
+        this.console.eval("spark.rmr('*')");
         assertEquals(0, ((List<String>) this.console.eval("spark.ls()")).size());
 
         //

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
new file mode 100644
index 0000000..10153b0
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GraphMemorySparkTest extends AbstractSparkTest {
+
+    @Test
+    public void shouldPersistGraphAndMemory() throws Exception {
+        final String outputLocation = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = getBaseConfiguration(SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        /////
+        Graph graph = GraphFactory.open(configuration);
+        final ComputerResult result = graph.compute(SparkGraphComputer.class).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
+        /////
+        final Storage storage = SparkContextStorage.open("local[4]");
+
+        assertEquals(2, storage.ls().size());
+        // TEST GRAPH PERSISTENCE
+        assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
+        assertEquals(6, IteratorUtils.count(storage.head(Constants.getGraphLocation(outputLocation), Tuple2.class)));
+        assertEquals(6, result.graph().traversal().V().count().next().longValue());
+        assertEquals(0, result.graph().traversal().E().count().next().longValue());
+        assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
+        assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
+        /////
+        // TEST MEMORY PERSISTENCE
+        assertEquals(2, (int) result.memory().get("clusterCount"));
+        assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
+        assertEquals(2, storage.head(Constants.getMemoryLocation(outputLocation, "clusterCount"), Tuple2.class).next()._2());
+    }
+
+}


Mime
View raw message