tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [25/50] [abbrv] tinkerpop git commit: We now have PartitionInputFormat which allows an Hadoop-based GraphComputer to pull data from any Graph implementation. PartitionInputSplit is basically a wrapper around Partition so that data access is data-local an
Date Fri, 13 Jan 2017 18:56:41 GMT
We now have PartitionInputFormat which allows an Hadoop-based GraphComputer to pull data from
any Graph implementation. PartitionInputSplit is basically a wrapper around Partition so that
data access is data-local and thus, for distributed Graph databases, it will simply Partition.vertices()
and turn them into VertxWritables. I have a test case that have SparkGraphComputer working
over TinkerGraph. Pretty neato. Still lots more cleaning and work to do, but this is a good
breather point.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/5361addd
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/5361addd
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/5361addd

Branch: refs/heads/TINKERPOP-1564
Commit: 5361adddb674b9c6529cf45ca6b8bda09bd10744
Parents: 0c6b09b
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Fri Dec 16 09:03:03 2016 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Fri Jan 13 11:55:55 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   3 +-
 .../gremlin/akka/process/actor/MasterActor.java |   2 +-
 .../gremlin/akka/process/actor/WorkerActor.java |   2 +-
 .../traversal/step/map/VertexProgramStep.java   |   8 +-
 .../computer/util/GraphComputerHelper.java      |  23 +++
 .../tinkerpop/gremlin/structure/Partition.java  |   2 +-
 .../gremlin/structure/Partitioner.java          |   8 +
 .../gremlin/structure/util/StringFactory.java   |   2 +-
 .../util/partitioner/GlobalPartitioner.java     |  12 +-
 .../util/partitioner/HashPartitioner.java       |  12 +-
 .../io/partitioner/PartitionerInputFormat.java  |  61 ++++++++
 .../io/partitioner/PartitionerInputSplit.java   |  79 ++++++++++
 .../io/partitioner/PartitionerRecordReader.java |  72 +++++++++
 .../process/computer/SparkGraphComputer.java    |   6 +-
 ...parkGraphPartitionerComputerProcessTest.java |  33 ++++
 .../TinkerGraphPartitionerProvider.java         | 155 +++++++++++++++++++
 .../process/computer/TinkerGraphComputer.java   |  14 +-
 17 files changed, 467 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index d12d394..e9eadf7 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -37,7 +37,8 @@ TinkerPop 3.3.0 (Release Date: NOT OFFICIALLY RELEASED YET)
 * Added `ProcessTraversalStrategy` which is used to get cached strategies associated with
a `Processor`.
 * Deprecated `Computer` in favor of `GraphComputer.open()`.
 * Deprecated `Graph.compute()` and `GraphComputer.submit()` in favor of `GraphComputer.submit(Graph)`.
->>>>>>> So much. withProcessor(Processor). No more Compute. Process.submit(Graph)
as we are now staging it so that every Processor (GraphComputer/GraphAgents) can work over
any Graph. This will all be via Partitioner. withComputer() deprecated. Lots of cool stuff
with Process strategies -- ProcessorTraveralStrategy like VertexProgramStratgegy and ActorProgramStrategy
work directly with TraversalStrategies.GlobalCache. So much other stuf... I forget. Check
the CHANGELOG. This was a massive undertaking but, thank god, its all backwards compatible
(though with deprecation).
+* Added `PartitionerInputFormat` which allows `SparkGraphComputer` and `GiraphGraphComputer`
to pull data from any `Graph` implementation.
+* Added `PartitionerInputRDD` which allows `SparkGraphComputer` to pull data from any `Graph`
implementation.
 * Updated Docker build scripts to include Python dependencies (NOTE: users should remove
any previously generated TinkerPop Docker images).
 * Added "attachment requisite" `VertexProperty.element()` and `Property.element()` data in
GraphSON serialization.
 * Added `Vertex`, `Edge`, `VertexProperty`, and `Property` serializers to Gremlin-Python
and exposed tests that use graph object arguments.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
index a4ef639..aa31c28 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -61,7 +61,7 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
         this.workers = new ArrayList<>();
         final List<Partition> partitions = partitioner.getPartitions();
         for (final Partition partition : partitions) {
-            final String workerPathString = "worker-" + partition.guid();
+            final String workerPathString = "worker-" + partition.id();
             this.workers.add(new Address.Worker(workerPathString, partition.location()));
             context().actorOf(Props.create(WorkerActor.class, program, this.master, partition,
partitioner), workerPathString);
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
index 35b5a4f..27f942a 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -106,7 +106,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
     }
 
     private String createWorkerAddress(final Partition partition) {
-        return "../worker-" + partition.guid();
+        return "../worker-" + partition.id();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index d005940..b02a725 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -64,7 +64,9 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult,
Com
             if (this.first && this.getPreviousStep() instanceof EmptyStep) {
                 this.first = false;
                 final Graph graph = this.getTraversal().getGraph().get();
-                future = this.getComputer().apply(graph).program(this.generateProgram(graph,
EmptyMemory.instance())).submit();
+                future = (this.getComputer().getGraphComputerClass().equals(GraphComputer.class))
?
+                        this.getComputer().apply(graph).program(this.generateProgram(graph,
EmptyMemory.instance())).submit() :
+                        GraphComputer.open(this.getComputer().configuration()).program(this.generateProgram(graph,
EmptyMemory.instance())).submit(graph);
                 final ComputerResult result = future.get();
                 this.processMemorySideEffects(result.memory());
                 return this.getTraversal().getTraverserGenerator().generate(result, this,
1l);
@@ -72,7 +74,9 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult,
Com
                 final Traverser.Admin<ComputerResult> traverser = this.starts.next();
                 final Graph graph = traverser.get().graph();
                 final Memory memory = traverser.get().memory();
-                future = this.generateComputer(graph).program(this.generateProgram(graph,
memory)).submit();
+                future = (this.getComputer().getGraphComputerClass().equals(GraphComputer.class))
?
+                        this.getComputer().apply(graph).program(this.generateProgram(graph,
memory)).submit() :
+                        GraphComputer.open(this.getComputer().configuration()).program(this.generateProgram(graph,
memory)).submit(graph);
                 final ComputerResult result = future.get();
                 this.processMemorySideEffects(result.memory());
                 return traverser.split(result, this);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
index dce1934..840c5da 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
@@ -18,13 +18,16 @@
  */
 package org.apache.tinkerpop.gremlin.process.computer.util;
 
+import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 
 import java.lang.reflect.Method;
+import java.util.Iterator;
 import java.util.Optional;
 
 /**
@@ -68,6 +71,26 @@ public final class GraphComputerHelper {
         return persist.isPresent() ? persist.get() : vertexProgram.isPresent() ? vertexProgram.get().getPreferredPersist()
: GraphComputer.Persist.NOTHING;
     }
 
+    public static GraphComputer configure(GraphComputer computer, final Configuration configuration)
{
+        final Iterator<String> keys = configuration.getKeys();
+        while (keys.hasNext()) {
+            final String key = keys.next();
+            if (key.equals(GraphComputer.WORKERS))
+                computer = computer.workers(configuration.getInt(GraphComputer.WORKERS));
+            else if (key.equals(GraphComputer.RESULT))
+                computer = computer.result(GraphComputer.ResultGraph.valueOf(configuration.getString(GraphComputer.RESULT)));
+            else if (key.equals(GraphComputer.PERSIST))
+                computer = computer.persist(GraphComputer.Persist.valueOf(configuration.getString(GraphComputer.PERSIST)));
+            else if (key.equals(GraphComputer.VERTICES))
+                computer = computer.vertices((Traversal.Admin) configuration.getProperty(GraphComputer.VERTICES));
+            else if (key.equals(GraphComputer.EDGES))
+                computer = computer.edges((Traversal.Admin) configuration.getProperty(GraphComputer.EDGES));
+            else if (!key.equals(GraphComputer.GRAPH_COMPUTER))
+                computer = computer.configure(key, configuration.getProperty(key));
+        }
+        return computer;
+    }
+
     public static boolean areEqual(final MapReduce a, final Object b) {
         if (null == a)
             throw Graph.Exceptions.argumentCanNotBeNull("a");

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
index dbb5260..f20b9fb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
@@ -65,7 +65,7 @@ public interface Partition {
      *
      * @return the unique id of the partition
      */
-    public UUID guid();
+    public String id();
 
     /**
      * Get the {@link InetAddress} of the locations physical location.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
index 1d4aae1..2e2cdb7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partitioner.java
@@ -30,4 +30,12 @@ public interface Partitioner {
 
     public Partition getPartition(final Element element);
 
+    public default Partition getPartition(final String id) {
+        for(final Partition partition : this.getPartitions()) {
+            if(partition.id().equals(id))
+                return partition;
+        }
+        throw new IllegalArgumentException("The provided partition does not exist in the
partitioner");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
index 61d9551..f7c350d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
@@ -150,7 +150,7 @@ public final class StringFactory {
     }
 
     public static String partitionString(final Partition partition) {
-        return "partition" + L_BRACKET + partition.location().getHostAddress() + COLON +
partition.guid() + R_BRACKET;
+        return "partition" + L_BRACKET + partition.location().getHostAddress() + COLON +
partition.id() + R_BRACKET;
     }
 
     public static String traversalSourceString(final TraversalSource traversalSource) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
index 361750b..1d72a2d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
@@ -32,7 +32,6 @@ import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -63,11 +62,12 @@ public final class GlobalPartitioner implements Partitioner {
     private class GlobalPartition implements Partition {
 
         private final Graph graph;
-        private final UUID guid = UUID.randomUUID();
+        private final String id;
         private final InetAddress location;
 
         private GlobalPartition(final Graph graph) {
             this.graph = graph;
+            this.id = this.graph.toString();
             try {
                 this.location = InetAddress.getLocalHost();
             } catch (final UnknownHostException e) {
@@ -97,17 +97,17 @@ public final class GlobalPartitioner implements Partitioner {
 
         @Override
         public boolean equals(final Object other) {
-            return other instanceof Partition && ((Partition) other).guid().equals(this.guid);
+            return other instanceof Partition && ((Partition) other).id().equals(this.id);
         }
 
         @Override
         public int hashCode() {
-            return this.guid.hashCode() + this.location.hashCode();
+            return this.id.hashCode() + this.location.hashCode();
         }
 
         @Override
-        public UUID guid() {
-            return this.guid;
+        public String id() {
+            return this.id;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
index b3d3db7..15b4563 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/HashPartitioner.java
@@ -31,7 +31,6 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -72,12 +71,13 @@ public final class HashPartitioner implements Partitioner {
         private final Partition basePartition;
         private final int totalSplits;
         private final int splitId;
-        private final UUID guid = UUID.randomUUID();
+        private final String id;
 
         private HashPartition(final Partition basePartition, final int splitId, final int
totalSplits) {
             this.basePartition = basePartition;
             this.totalSplits = totalSplits;
             this.splitId = splitId;
+            this.id = this.basePartition.id() + "#" + splitId;
         }
 
         @Override
@@ -102,17 +102,17 @@ public final class HashPartitioner implements Partitioner {
 
         @Override
         public boolean equals(final Object other) {
-            return other instanceof Partition && ((Partition) other).guid().equals(this.guid);
+            return other instanceof Partition && ((Partition) other).id().equals(this.id);
         }
 
         @Override
         public int hashCode() {
-            return this.guid.hashCode() + this.location().hashCode();
+            return this.id.hashCode() + this.location().hashCode();
         }
 
         @Override
-        public UUID guid() {
-            return this.guid;
+        public String id() {
+            return this.id;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java
new file mode 100644
index 0000000..2dae040
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputFormat.java
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PartitionerInputFormat extends InputFormat<NullWritable, VertexWritable>
{
+    @Override
+    public List<InputSplit> getSplits(final JobContext jobContext) throws IOException,
InterruptedException {
+        final Graph graph = GraphFactory.open(ConfUtil.makeApacheConfiguration(jobContext.getConfiguration()));
+        final List<Partition> partitions = graph.partitioner().getPartitions();
+        final List<InputSplit> inputSplits = new ArrayList<>(partitions.size());
+        for (final Partition partition : partitions) {
+            inputSplits.add(new PartitionerInputSplit(partition));
+        }
+        return inputSplits;
+    }
+
+    @Override
+    public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit
inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
+        final PartitionerRecordReader reader = new PartitionerRecordReader();
+        reader.initialize(inputSplit, taskAttemptContext);
+        return reader;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java
new file mode 100644
index 0000000..923f8b3
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerInputSplit.java
@@ -0,0 +1,79 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PartitionerInputSplit extends InputSplit implements Writable {
+
+    private String location;
+    private String id;
+
+    public PartitionerInputSplit() {
+        // necessary for serialization/writable
+    }
+
+    public PartitionerInputSplit(final Partition partition) {
+        this.location = partition.location().getHostAddress();
+        this.id = partition.id();
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+        return 1;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+        return new String[]{this.location};
+    }
+
+    public String getPartitionId() {
+        return this.id;
+    }
+
+    @Override
+    public void write(final DataOutput dataOutput) throws IOException {
+        WritableUtils.writeString(dataOutput, this.location);
+        WritableUtils.writeString(dataOutput, this.id);
+    }
+
+    @Override
+    public void readFields(final DataInput dataInput) throws IOException {
+        this.location = WritableUtils.readString(dataInput);
+        this.id = WritableUtils.readString(dataInput);
+    }
+
+
+    @Override
+    public String toString() {
+        return this.location + "::" + this.id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java
new file mode 100644
index 0000000..c6de8ab
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/partitioner/PartitionerRecordReader.java
@@ -0,0 +1,72 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PartitionerRecordReader extends RecordReader<NullWritable, VertexWritable>
{
+
+    private Iterator<Vertex> vertices;
+
+    @Override
+    public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
+        final Graph graph = GraphFactory.open(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()));
+        this.vertices = graph.partitioner().getPartition(((PartitionerInputSplit) inputSplit).getPartitionId()).vertices();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        return this.vertices.hasNext();
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+        return NullWritable.get();
+    }
+
+    @Override
+    public VertexWritable getCurrentValue() throws IOException, InterruptedException {
+        return new VertexWritable(this.vertices.next());
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 61c9663..cbcdfe7 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -53,6 +53,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
@@ -143,8 +144,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
 
     @Override
     public Future<ComputerResult> submit(final Graph graph) {
-        this.hadoopGraph = (HadoopGraph) graph;
-        ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
+        ConfigurationUtils.copy(graph.configuration(), this.sparkConfiguration);
         return this.submit();
     }
 
@@ -154,7 +154,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
     }
 
     public static SparkGraphComputer open(final org.apache.commons.configuration.Configuration
configuration) {
-        return HadoopGraph.open(configuration).compute(SparkGraphComputer.class);
+        return new SparkGraphComputer(HadoopGraph.open(configuration));
     }
 
     private Future<ComputerResult> submitWithExecutor(Executor exec) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java
new file mode 100644
index 0000000..7c5b3e3
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/SparkGraphPartitionerComputerProcessTest.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.partitioner;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessComputerSuite.class)
+@GraphProviderClass(provider = TinkerGraphPartitionerProvider.class, graph = TinkerGraph.class)
+public class SparkGraphPartitionerComputerProcessTest {
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java
new file mode 100644
index 0000000..941c942
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/partitioner/TinkerGraphPartitionerProvider.java
@@ -0,0 +1,155 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.partitioner;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
+import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
+import org.apache.tinkerpop.gremlin.GraphProvider;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.partitioner.PartitionerInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@GraphProvider.Descriptor(computer = SparkGraphComputer.class)
+public class TinkerGraphPartitionerProvider extends AbstractGraphProvider {
+
+    private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
+            "testProfileStrategyCallback",
+            "testProfileStrategyCallbackSideEffect",
+            "shouldSucceedWithProperTraverserRequirements",
+            "shouldFailWithImproperTraverserRequirements"));
+
+    private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{
+        add(TinkerEdge.class);
+        add(TinkerElement.class);
+        add(TinkerGraph.class);
+        add(TinkerGraphVariables.class);
+        add(TinkerProperty.class);
+        add(TinkerVertex.class);
+        add(TinkerVertexProperty.class);
+    }};
+
+    @Override
+    public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?>
test, final String testMethodName,
+                                                    final LoadGraphWith.GraphData loadGraphWith)
{
+
+        final TinkerGraph.DefaultIdManager idManager = selectIdMakerFromGraphData(loadGraphWith);
+        final String idMaker = (idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? selectIdMakerFromGraphData(loadGraphWith)
: idManager).name();
+        final boolean skipTest = SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName());
+        return new HashMap<String, Object>() {{
+            put(Graph.GRAPH, TinkerGraph.class.getName());
+            put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker);
+            put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker);
+            put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker);
+            put("skipTest", skipTest);
+            if (loadGraphWith == LoadGraphWith.GraphData.CREW)
+                put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY,
VertexProperty.Cardinality.list.name());
+            put("spark.master", "local[4]");
+            put("spark.serializer", GryoSerializer.class.getCanonicalName());
+            put("spark.kryo.registrationRequired", true);
+            put(Constants.GREMLIN_HADOOP_GRAPH_READER, PartitionerInputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory());
+            put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+            put(GraphComputer.GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+            put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+            put(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+            // System.out.println(AbstractGremlinTest.class.getResource(loadGraphWith.location()).toString().replace("file:",""));
+            put(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, AbstractGremlinTest.class.getResource(loadGraphWith.location()).toString().replace("file:",
""));
+        }};
+
+
+    }
+
+    protected void readIntoGraph(final Graph g, final String path) throws IOException {
+
+    }
+
+    @Override
+    public void clear(final Graph graph, final Configuration configuration) throws Exception
{
+        //if (graph != null)
+        //    graph.close();
+    }
+
+    @Override
+    public Set<Class> getImplementations() {
+        return IMPLEMENTATION;
+    }
+
+    /**
+     * Test that load with specific graph data can be configured with a specific id manager
as the data type to
+     * be used in the test for that graph is known.
+     */
+    protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final LoadGraphWith.GraphData
loadGraphWith) {
+        if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY;
+        if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC))
+            return TinkerGraph.DefaultIdManager.INTEGER;
+        else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN))
+            return TinkerGraph.DefaultIdManager.INTEGER;
+        else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW))
+            return TinkerGraph.DefaultIdManager.INTEGER;
+        else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL))
+            return TinkerGraph.DefaultIdManager.INTEGER;
+        else
+            throw new IllegalStateException(String.format("Need to define a new %s for %s",
TinkerGraph.IdManager.class.getName(), loadGraphWith.name()));
+    }
+
+    /////////////////////////////
+    /////////////////////////////
+    /////////////////////////////
+
+    @Override
+    public GraphTraversalSource traversal(final Graph graph) {
+        if ((Boolean) graph.configuration().getProperty("skipTest"))
+            return graph.traversal().withComputer();
+        else {
+            return graph.traversal().withProcessor(SparkGraphComputer.open(graph.configuration()));
+        }
+    }
+
+    @Override
+    public GraphComputer getGraphComputer(final Graph graph) {
+        return SparkGraphComputer.open(graph.configuration());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5361addd/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 82cb934..cbbe53f 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -69,7 +69,7 @@ public final class TinkerGraphComputer implements GraphComputer {
     private Persist persist = null;
 
     private VertexProgram<?> vertexProgram;
-    private final TinkerGraph graph;
+    private TinkerGraph graph;
     private TinkerMemory memory;
     private final TinkerMessageBoard messageBoard = new TinkerMessageBoard();
     private boolean executed = false;
@@ -97,6 +97,7 @@ public final class TinkerGraphComputer implements GraphComputer {
         this.graph = null;
         this.configuration = configuration;
         this.configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName());
+        GraphComputerHelper.configure(this, ConfigurationUtils.cloneConfiguration(configuration));
     }
 
     public static TinkerGraphComputer open(final Configuration configuration) {
@@ -104,10 +105,7 @@ public final class TinkerGraphComputer implements GraphComputer {
     }
 
     public static TinkerGraphComputer open() {
-        final BaseConfiguration configuration = new BaseConfiguration();
-        configuration.setDelimiterParsingDisabled(true);
-        configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName());
-        return new TinkerGraphComputer(configuration);
+        return new TinkerGraphComputer(new BaseConfiguration());
     }
 
     @Override
@@ -164,6 +162,12 @@ public final class TinkerGraphComputer implements GraphComputer {
     }
 
     @Override
+    public Future<ComputerResult> submit(final Graph graph) {
+        this.graph = (TinkerGraph) graph;
+        return this.submit();
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         // a graph computer can only be executed once
         if (this.executed)


Mime
View raw message