tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [01/18] incubator-tinkerpop git commit: Created the first GraphComputer Provider TraversalStrategy -- SparkPartitionAwareStratgegy. This strategy analyzes the traversal to determine if partitioning should be skipped -- e.g. if no message pass is happenin
Date Fri, 06 May 2016 16:08:55 GMT
Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 55172b447 -> e560970f4


Created the first GraphComputer Provider TraversalStrategy -- SparkPartitionAwareStratgegy.
This strategy analyzes the traversal to determine if partitioning should be skipped -- e.g.
if no message pass is happening. I think this strategy will start to set the stage for how
future GraphComputer provider-specific strategies will work.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/77480a2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/77480a2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/77480a2c

Branch: refs/heads/master
Commit: 77480a2c7875e284b063ecca434bf2165116689d
Parents: 76d5be2
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Tue May 3 14:14:29 2016 -0600
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Tue May 3 14:14:29 2016 -0600

----------------------------------------------------------------------
 .../gremlin/process/computer/Computer.java      |  75 +++++++++---
 .../traversal/TraversalVertexProgram.java       |  11 +-
 .../process/traversal/TraversalSource.java      |   4 +-
 .../tinkerpop/gremlin/hadoop/Constants.java     |   1 +
 .../optimization/Neo4jGraphStepStrategy.java    |   2 +-
 .../spark/process/computer/SparkExecutor.java   |  68 ++++++-----
 .../process/computer/SparkGraphComputer.java    |  22 +++-
 .../SparkPartitionAwareStrategy.java            |  86 ++++++++++++++
 .../SparkPartitionAwareStrategyTest.java        | 115 +++++++++++++++++++
 9 files changed, 331 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
index 4f416dc..ce31d5d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
@@ -32,7 +32,7 @@ import java.util.function.Function;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class Computer implements Function<Graph, GraphComputer>, Serializable
{
+public final class Computer implements Function<Graph, GraphComputer>, Serializable,
Cloneable {
 
     private Class<? extends GraphComputer> graphComputerClass = GraphComputer.class;
     private Map<String, Object> configuration = new HashMap<>();
@@ -55,41 +55,42 @@ public final class Computer implements Function<Graph, GraphComputer>,
Serializa
     }
 
     public Computer configure(final String key, final Object value) {
-        this.configuration.put(key, value);
-        return this;
+        final Computer clone = this.clone();
+        clone.configuration.put(key, value);
+        return clone;
     }
 
     public Computer workers(final int workers) {
-        this.workers = workers;
-        return this;
+        final Computer clone = this.clone();
+        clone.workers = workers;
+        return clone;
     }
 
     public Computer persist(final GraphComputer.Persist persist) {
-        this.persist = persist;
-        return this;
+        final Computer clone = this.clone();
+        clone.persist = persist;
+        return clone;
     }
 
 
     public Computer result(final GraphComputer.ResultGraph resultGraph) {
-        this.resultGraph = resultGraph;
-        return this;
+        final Computer clone = this.clone();
+        clone.resultGraph = resultGraph;
+        return clone;
     }
 
     public Computer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
-        this.vertices = vertexFilter;
-        return this;
+        final Computer clone = this.clone();
+        clone.vertices = vertexFilter;
+        return clone;
     }
 
     public Computer edges(final Traversal<Vertex, Edge> edgeFilter) {
-        this.edges = edgeFilter;
-        return this;
+        final Computer clone = this.clone();
+        clone.edges = edgeFilter;
+        return clone;
     }
 
-    public Class<? extends GraphComputer> getGraphComputerClass() {
-        return this.graphComputerClass;
-    }
-
-    @Override
     public GraphComputer apply(final Graph graph) {
         GraphComputer computer = this.graphComputerClass.equals(GraphComputer.class) ? graph.compute()
: graph.compute(this.graphComputerClass);
         for (final Map.Entry<String, Object> entry : this.configuration.entrySet())
{
@@ -112,4 +113,42 @@ public final class Computer implements Function<Graph, GraphComputer>,
Serializa
     public String toString() {
         return this.graphComputerClass.getSimpleName().toLowerCase();
     }
+
+    @Override
+    public Computer clone() {
+        try {
+            final Computer clone = (Computer) super.clone();
+            clone.configuration = new HashMap<>(this.configuration);
+            if (null != this.vertices)
+                clone.vertices = this.vertices.asAdmin().clone();
+            if (null != this.edges)
+                clone.edges = this.edges.asAdmin().clone();
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage());
+        }
+    }
+
+    /////////////////
+    /////////////////
+
+    public Class<? extends GraphComputer> getGraphComputerClass() {
+        return this.graphComputerClass;
+    }
+
+    public Map<String,Object> getConfiguration()  {
+        return this.configuration;
+    }
+
+    public Traversal<Vertex,Vertex> getVertices() {
+        return this.vertices;
+    }
+
+    public Traversal<Vertex,Edge> getEdges() {
+        return this.edges;
+    }
+
+    public int getWorkers() {
+        return this.workers;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 8e4a75e..166625a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
@@ -59,7 +60,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileSid
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
@@ -133,6 +133,15 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         return VertexProgram.<TraversalVertexProgram>createVertexProgram(graph, configuration).traversal.get();
     }
 
+    /**
+     * Get the {@link Traversal} associated with the current instance of the traversal vertex
program.
+     *
+     * @return the traversal of the instantiated program
+     */
+    public Traversal.Admin<?, ?> getTraversal() {
+        return this.traversal.get();
+    }
+
     @Override
     public void loadState(final Graph graph, final Configuration configuration) {
         if (!configuration.containsKey(TRAVERSAL))

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
index 98163c7..3b60940 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
@@ -98,11 +98,11 @@ public interface TraversalSource extends Cloneable {
      * @return a new traversal source with updated strategies
      */
     public default TraversalSource withComputer(final Computer computer) {
-        final List<TraversalStrategy<?>> graphComputerStrategies = TraversalStrategies.GlobalCache.getStrategies(computer.getGraphComputerClass()).toList();
+        final List<TraversalStrategy<?>> graphComputerStrategies = TraversalStrategies.GlobalCache.getStrategies(computer.apply(this.getGraph()).getClass()).toList();
         final TraversalStrategy[] traversalStrategies = new TraversalStrategy[graphComputerStrategies.size()
+ 1];
         traversalStrategies[0] = new VertexProgramStrategy(computer);
         for (int i = 0; i < graphComputerStrategies.size(); i++) {
-            traversalStrategies[i+1] = graphComputerStrategies.get(i);
+            traversalStrategies[i + 1] = graphComputerStrategies.get(i);
         }
         return this.withStrategies(traversalStrategies);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 8d5bb21..872b3fb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -57,6 +57,7 @@ public final class Constants {
     public static final String GREMLIN_SPARK_PERSIST_CONTEXT = "gremlin.spark.persistContext";
     public static final String GREMLIN_SPARK_GRAPH_STORAGE_LEVEL = "gremlin.spark.graphStorageLevel";
     public static final String GREMLIN_SPARK_PERSIST_STORAGE_LEVEL = "gremlin.spark.persistStorageLevel";
+    public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner";
     public static final String SPARK_SERIALIZER = "spark.serializer";
 
     public static String getGraphLocation(final String location) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/neo4j-gremlin/src/main/java/org/apache/tinkerpop/gremlin/neo4j/process/traversal/strategy/optimization/Neo4jGraphStepStrategy.java
----------------------------------------------------------------------
diff --git a/neo4j-gremlin/src/main/java/org/apache/tinkerpop/gremlin/neo4j/process/traversal/strategy/optimization/Neo4jGraphStepStrategy.java
b/neo4j-gremlin/src/main/java/org/apache/tinkerpop/gremlin/neo4j/process/traversal/strategy/optimization/Neo4jGraphStepStrategy.java
index 1d2e24a..eed7b15 100644
--- a/neo4j-gremlin/src/main/java/org/apache/tinkerpop/gremlin/neo4j/process/traversal/strategy/optimization/Neo4jGraphStepStrategy.java
+++ b/neo4j-gremlin/src/main/java/org/apache/tinkerpop/gremlin/neo4j/process/traversal/strategy/optimization/Neo4jGraphStepStrategy.java
@@ -31,7 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
  * @author Pieter Martin
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class Neo4jGraphStepStrategy extends AbstractTraversalStrategy<TraversalStrategy.ProviderOptimizationStrategy>
{
+public final class Neo4jGraphStepStrategy extends AbstractTraversalStrategy<TraversalStrategy.ProviderOptimizationStrategy>
implements TraversalStrategy.ProviderOptimizationStrategy {
 
     private static final Neo4jGraphStepStrategy INSTANCE = new Neo4jGraphStepStrategy();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 3e0f09a..aadb70d 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
 import com.google.common.base.Optional;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -78,7 +80,8 @@ public final class SparkExecutor {
             final SparkMemory memory,
             final Configuration apacheConfiguration) {
 
-        if (null != viewIncomingRDD) // the graphRDD and the viewRDD must have the same partitioner
+        final boolean partitionedGraphRDD = graphRDD.partitioner().isPresent();
+        if (partitionedGraphRDD && null != viewIncomingRDD) // the graphRDD and the
viewRDD must have the same partitioner
             assert graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get());
         final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null
== viewIncomingRDD) ?
                 graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable,
Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have
any views or messages
@@ -119,34 +122,46 @@ public final class SparkExecutor {
                     });
                 }, true)); // true means that the partition is preserved
         // the graphRDD and the viewRDD must have the same partitioner
-        assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
+        if (partitionedGraphRDD)
+            assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
         // "message pass" by reducing on the vertex object id of the view and message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration),
apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD
= viewOutgoingRDD
-                .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+
+        /////////////////////////////////////////////////////////////
+        /////////////////////////////////////////////////////////////
+        final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>,
Object, Payload> messageFunction =
+                tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
     // emit the view payload
-                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message
-> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit
the outgoing message payloads one by one
-                .reduceByKey(graphRDD.partitioner().get(), (a, b) -> {      // reduce
the view and outgoing messages into a single payload object representing the new view and
incoming messages for a vertex
-                    if (a instanceof ViewIncomingPayload) {
-                        ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
-                        return a;
-                    } else if (b instanceof ViewIncomingPayload) {
-                        ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
-                        return b;
-                    } else {
-                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
-                        c.mergePayload(a, messageCombiner);
-                        c.mergePayload(b, messageCombiner);
-                        return c;
-                    }
-                })
-                .filter(payload -> !(payload._2() instanceof MessagePayload)) // this
happens if there is a message to a vertex that does not exist
-                .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) &&
!((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many
messages to a vertex that does not exist
-                .mapValues(payload -> payload instanceof ViewIncomingPayload ?
-                        (ViewIncomingPayload<M>) payload :                    // this
happens if there is a vertex with incoming messages
-                        new ViewIncomingPayload<>((ViewPayload) payload));    // this
happens if there is a vertex with no incoming messages
+                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message
-> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
+        final Function2<Payload, Payload, Payload> reducerFunction = (a, b) -> {
     // reduce the view and outgoing messages into a single payload object representing the
new view and incoming messages for a vertex
+            if (a instanceof ViewIncomingPayload) {
+                ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
+                return a;
+            } else if (b instanceof ViewIncomingPayload) {
+                ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
+                return b;
+            } else {
+                final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
+                c.mergePayload(a, messageCombiner);
+                c.mergePayload(b, messageCombiner);
+                return c;
+            }
+        };
+        /////////////////////////////////////////////////////////////
+        /////////////////////////////////////////////////////////////
+
+        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD
=
+                (partitionedGraphRDD ?
+                        viewOutgoingRDD.flatMapToPair(messageFunction).reduceByKey(graphRDD.partitioner().get(),
reducerFunction) :
+                        viewOutgoingRDD.flatMapToPair(messageFunction).reduceByKey(reducerFunction))
+                        .filter(payload -> !(payload._2() instanceof MessagePayload))
// this happens if there is a message to a vertex that does not exist
+                        .filter(payload -> !((payload._2() instanceof ViewIncomingPayload)
&& !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there
are many messages to a vertex that does not exist
+                        .mapValues(payload -> payload instanceof ViewIncomingPayload ?
+                                (ViewIncomingPayload<M>) payload :                
   // this happens if there is a vertex with incoming messages
+                                new ViewIncomingPayload<>((ViewPayload) payload));
   // this happens if there is a vertex with no incoming messages
         // the graphRDD and the viewRDD must have the same partitioner
-        assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
+        if (partitionedGraphRDD)
+            assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
         newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
                     HadoopPools.initialize(apacheConfiguration);
@@ -156,7 +171,8 @@ public final class SparkExecutor {
 
     public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final
JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>>
viewIncomingRDD, final Set<VertexComputeKey> vertexComputeKeys) {
         // the graphRDD and the viewRDD must have the same partitioner
-        assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
+        if (graphRDD.partitioner().isPresent())
+            assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
         // attach the final computed view to the cached graph
         return graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 928a880..cd01779 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -51,7 +51,9 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization.SparkPartitionAwareStrategy;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
@@ -80,6 +82,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
     private final org.apache.commons.configuration.Configuration sparkConfiguration;
     private boolean workersSet = false;
 
+    static {
+        TraversalStrategies.GlobalCache.registerStrategies(SparkGraphComputer.class, TraversalStrategies.GlobalCache.getStrategies(GraphComputer.class).clone().addStrategies(SparkPartitionAwareStrategy.instance()));
+    }
+
     public SparkGraphComputer(final HadoopGraph hadoopGraph) {
         super(hadoopGraph);
         this.sparkConfiguration = new HadoopConfiguration();
@@ -124,6 +130,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
             final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER,
Object.class));
             final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER,
Object.class));
             final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER,
Object.class));
+            final boolean skipPartitioner = apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER,
false);
             String inputLocation = null;
             if (inputFromSpark)
                 inputLocation = Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
sparkContextStorage).orElse(null);
@@ -202,12 +209,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                 if (loadedGraphRDD.partitioner().isPresent())
                     this.logger.debug("Using the existing partitioner associated with the
loaded graphRDD: " + loadedGraphRDD.partitioner().get());
                 else {
-                    final Partitioner partitioner = new HashPartitioner(this.workersSet ?
this.workers : loadedGraphRDD.partitions().size());
-                    this.logger.debug("Partitioning the loaded graphRDD: " + partitioner);
-                    loadedGraphRDD = loadedGraphRDD.partitionBy(partitioner);
-                    partitioned = true;
+                    if (!skipPartitioner) {
+                        final Partitioner partitioner = new HashPartitioner(this.workersSet
? this.workers : loadedGraphRDD.partitions().size());
+                        this.logger.debug("Partitioning the loaded graphRDD: " + partitioner);
+                        loadedGraphRDD = loadedGraphRDD.partitionBy(partitioner);
+                        partitioned = true;
+                        assert loadedGraphRDD.partitioner().isPresent();
+                    } else {
+                        assert skipPartitioner == !loadedGraphRDD.partitioner().isPresent();
// no easy way to test this with a test case
+                        this.logger.debug("Partitioning has been skipped for the loaded graphRDD
via " + Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
+                    }
                 }
-                assert loadedGraphRDD.partitioner().isPresent();
                 // if the loaded graphRDD was already partitioned previous, then this coalesce/repartition
will not take place
                 if (this.workersSet) {
                     if (loadedGraphRDD.partitions().size() > this.workers) // ensures
that the loaded graphRDD does not have more partitions than workers

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java
new file mode 100644
index 0000000..02aa1ae
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkPartitionAwareStrategy extends AbstractTraversalStrategy<TraversalStrategy.ProviderOptimizationStrategy>
implements TraversalStrategy.ProviderOptimizationStrategy {
+
+    private static final SparkPartitionAwareStrategy INSTANCE = new SparkPartitionAwareStrategy();
+
+    private static final Set<Class<? extends Step>> MESSAGE_PASS_CLASSES = new
HashSet<>(Arrays.asList(
+            EdgeVertexStep.class,
+            LambdaMapStep.class, // maybe?
+            LambdaFlatMapStep.class // maybe?
+            // VertexStep is special as you need to see if the return class is Edge or Vertex
(logic below)
+    ));
+
+    private SparkPartitionAwareStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        final List<TraversalVertexProgramStep> steps = TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class,
traversal);
+        for (final TraversalVertexProgramStep step : steps) {
+            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(traversal.getGraph().orElse(EmptyGraph.instance())).getTraversal();
+            boolean messagePasses = MESSAGE_PASS_CLASSES.stream()
+                    .flatMap(clazz -> TraversalHelper.<Step<?, ?>>getStepsOfAssignableClassRecursively((Class)
clazz, computerTraversal).stream())
+                    .filter(s -> TraversalHelper.isGlobalChild(((Step) s).getTraversal().asAdmin()))
+                    .findAny()
+                    .isPresent();
+            if (!messagePasses) {
+                for (final VertexStep vertexStep : TraversalHelper.getStepsOfAssignableClassRecursively(VertexStep.class,
computerTraversal)) {
+                    if (TraversalHelper.isGlobalChild(vertexStep.getTraversal()) &&
+                            (vertexStep.returnsVertex() || !vertexStep.getDirection().equals(Direction.OUT)))
{ // in edges require message pass in OLAP
+                        messagePasses = true;
+                        break;
+                    }
+                }
+            }
+            if (!messagePasses)  // if no message passing, don't partition (save time)
+                step.setComputer(step.getComputer().configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER,
true));
+        }
+    }
+
+    public static SparkPartitionAwareStrategy instance() {
+        return INSTANCE;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/77480a2c/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategyTest.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategyTest.java
new file mode 100644
index 0000000..4324202
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategyTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.Computer;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkPartitionAwareStrategyTest extends AbstractSparkTest {
+
+    @Test
+    public void shouldPartitionTheInputRDDAccordingly() throws Exception {
+        final String outputLocation = TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class,
UUID.randomUUID().toString());
+        Configuration configuration = getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+        ///
+        Graph graph = GraphFactory.open(configuration);
+        GraphTraversalSource g = graph.traversal().withComputer();
+        assertTrue(g.getStrategies().toList().contains(SparkPartitionAwareStrategy.instance()));
+        assertTrue(g.V().count().explain().toString().contains(SparkPartitionAwareStrategy.class.getSimpleName()));
+        //
+        assertEquals(6l, g.V().count().next().longValue());
+        assertEquals(2l, g.V().out().out().count().next().longValue());
+    }
+
+
+    @Test
+    public void shouldSetConfigurationsCorrectly() throws Exception {
+        final String outputLocation = TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class,
UUID.randomUUID().toString());
+        Configuration configuration = getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+
+        Graph graph = GraphFactory.open(configuration);
+        GraphTraversalSource g = graph.traversal().withComputer(Computer.compute().persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW));
+
+        assertTrue(skipPartitioner(g.V().limit(10)));
+        assertTrue(skipPartitioner(g.V().values("age").groupCount()));
+        assertTrue(skipPartitioner(g.V().groupCount().by(__.out().count())));
+        assertTrue(skipPartitioner(g.V().outE()));
+        assertTrue(skipPartitioner(g.V().count()));
+        assertTrue(skipPartitioner(g.V().out().count()));
+        assertTrue(skipPartitioner(g.V().local(__.inE()).count()));
+        assertTrue(skipPartitioner(g.V().outE().inV().count()));
+        ////
+        assertFalse(skipPartitioner(g.V().outE().inV()));
+        assertFalse(skipPartitioner(g.V().both()));
+        assertFalse(skipPartitioner(g.V().both().count()));
+        assertFalse(skipPartitioner(g.V().out().id()));
+        assertFalse(skipPartitioner(g.V().out().out().count()));
+        assertFalse(skipPartitioner(g.V().in().count()));
+        assertFalse(skipPartitioner(g.V().inE().count()));
+
+
+    }
+
+    private static boolean skipPartitioner(final Traversal<?, ?> traversal) {
+        traversal.asAdmin().applyStrategies();
+        return (Boolean) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class,
traversal.asAdmin()).get()
+                .getComputer()
+                .getConfiguration()
+                .getOrDefault(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false);
+
+    }
+
+}



Mime
View raw message