tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject incubator-tinkerpop git commit: lots of more documentation on TraversalVertexProgram and I really combed through the code and was able to find numerous minor optimizations here and there.
Date Wed, 25 May 2016 20:54:21 GMT
Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1310 7255844c0 -> e6f2caa89


lots of more documentation on TraversalVertexProgram and I really combed through the code
and was able to find numerous minor optimizations here and there.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e6f2caa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e6f2caa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e6f2caa8

Branch: refs/heads/TINKERPOP-1310
Commit: e6f2caa89adc8e6630dadc613ca9b6b92416c223
Parents: 7255844
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed May 25 14:54:15 2016 -0600
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed May 25 14:54:15 2016 -0600

----------------------------------------------------------------------
 .../computer/traversal/MasterExecutor.java      | 47 +++++++++-----------
 .../traversal/TraversalVertexProgram.java       |  2 +
 .../computer/traversal/WorkerExecutor.java      | 32 ++++++-------
 .../gremlin/hadoop/structure/HadoopGraph.java   |  2 +-
 .../HaltedTraverserFactoryStrategyTest.java     |  7 ++-
 5 files changed, 43 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
index b994f1e..1c1e9d2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
@@ -43,7 +43,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.HashSet;
 import java.util.Iterator;
@@ -70,20 +69,17 @@ final class MasterExecutor {
         return traverser;
     }
 
-    // handle traversers and data that were sent from the workers to the master traversal
via memory
-    protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix,
final Memory memory, final TraverserSet<Object> traverserSet, final Set<String>
completedBarriers) {
+    protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix,
final Memory memory, final TraverserSet<Object> toProcessTraversers, final Set<String>
completedBarriers) {
+        // handle traversers and data that were sent from the workers to the master traversal
via memory
         if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
             for (final String key : memory.<Set<String>>get(TraversalVertexProgram.MUTATED_MEMORY_KEYS))
{
                 final Step<Object, Object> step = traversalMatrix.getStepById(key);
-                if (null == step) continue; // why? how can this happen?
                 assert step instanceof Barrier;
                 completedBarriers.add(step.getId());
                 if (!(step instanceof LocalBarrier)) {  // local barriers don't do any processing
on the master traversal (they just lock on the workers)
                     final Barrier<Object> barrier = (Barrier<Object>) step;
                     barrier.addBarrier(memory.get(key));
-                    while (step.hasNext()) {
-                        traverserSet.add(step.next());
-                    }
+                    step.forEachRemaining(toProcessTraversers::add);
                     // if it was a reducing barrier step, reset the barrier to its seed value
                     if (step instanceof ReducingBarrierStep)
                         memory.set(step.getId(), ((ReducingBarrierStep) step).getSeedSupplier().get());
@@ -100,34 +96,33 @@ final class MasterExecutor {
                                             final TraverserSet<Object> haltedTraversers,
                                             final Class haltedTraverserFactory) {
 
-
         while (!toProcessTraversers.isEmpty()) {
             final TraverserSet<Object> localActiveTraversers = new TraverserSet<>();
             Step<Object, Object> previousStep = EmptyStep.instance();
             Step<Object, Object> currentStep = EmptyStep.instance();
 
-            final Iterator<Traverser.Admin<Object>> traversers = IteratorUtils.removeOnNext(toProcessTraversers.iterator());
+            // these are traversers that are at the master traversal and will either halt
here or be distributed back to the workers as needed
+            final Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
             while (traversers.hasNext()) {
                 final Traverser.Admin<Object> traverser = traversers.next();
-                traverser.set(DetachedFactory.detach(traverser.get(), true));
+                traversers.remove();
+                traverser.set(DetachedFactory.detach(traverser.get(), true)); // why?
                 traverser.setSideEffects(traversal.get().getSideEffects());
-                if (traverser.isHalted()) {
+                if (traverser.isHalted())
                     haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory));
-                } else if (isRemoteTraverser(traverser, traversalMatrix)) {  // this is so
that patterns like order().name work as expected.
+                else if (isRemoteTraverser(traverser, traversalMatrix))  // this is so that
patterns like order().name work as expected. try and stay local as long as possible
                     remoteActiveTraversers.add(traverser.detach());
-                } else {
+                else {
                     currentStep = traversalMatrix.getStepById(traverser.getStepId());
                     if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep
instanceof EmptyStep)) {
                         while (previousStep.hasNext()) {
                             final Traverser.Admin<Object> result = previousStep.next();
-                            if (result.isHalted()) {
+                            if (result.isHalted())
                                 haltedTraversers.add(MasterExecutor.detach(result, haltedTraverserFactory));
-                            } else {
-                                if (isRemoteTraverser(result, traversalMatrix)) {
-                                    remoteActiveTraversers.add(result.detach());
-                                } else
-                                    localActiveTraversers.add(result);
-                            }
+                            else if (isRemoteTraverser(result, traversalMatrix))
+                                remoteActiveTraversers.add(result.detach());
+                            else
+                                localActiveTraversers.add(result);
                         }
                     }
                     currentStep.addStart(traverser);
@@ -137,14 +132,12 @@ final class MasterExecutor {
             if (!(currentStep instanceof EmptyStep)) {
                 while (currentStep.hasNext()) {
                     final Traverser.Admin<Object> traverser = currentStep.next();
-                    if (traverser.isHalted()) {
+                    if (traverser.isHalted())
                         haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory));
-                    } else {
-                        if (isRemoteTraverser(traverser, traversalMatrix)) {
-                            remoteActiveTraversers.add(traverser.detach());
-                        } else
-                            localActiveTraversers.add(traverser);
-                    }
+                    else if (isRemoteTraverser(traverser, traversalMatrix))
+                        remoteActiveTraversers.add(traverser.detach());
+                    else
+                        localActiveTraversers.add(traverser);
                 }
             }
             assert toProcessTraversers.isEmpty();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index d4daaac..4479306 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -297,7 +297,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         if (voteToHalt) {
             // local traverser sets to process
             final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
+            // traversers that need to be sent back to the workers (no longer can be processed
locally by the master traversal)
             final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>();
+            // halted traversers that have completed their journey
             final TraverserSet<Object> haltedTraversers = memory.get(HALTED_TRAVERSERS);
             // get all barrier traversers
             final Set<String> completedBarriers = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
index f833b6f..5798af0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.computer.util.SingleMessenger;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -36,7 +35,6 @@ import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceElement;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
@@ -65,19 +63,18 @@ final class WorkerExecutor {
         final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
         final TraverserSet<Object> activeTraversers = new TraverserSet<>();
         final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
-        final boolean isTesting = Boolean.valueOf(System.getProperty("is.testing", "false"));
 
         ////////////////////////////////
         // GENERATE LOCAL TRAVERSERS //
         ///////////////////////////////
 
-        // some memory systems are interacted by multiple threads and thus, concurrent modification
can happen at iterator.remove()
+        // these are traversers that are going from OLTP (master) to OLAP (workers)
+        // these traversers were broadcasted from the master traversal to the workers for
attachment
+        final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
+        // some memory systems are interacted with by multiple threads and thus, concurrent
modification can happen at iterator.remove().
         // its better to reduce the memory footprint and shorten the active traverser list
so synchronization is worth it.
-        // most distributed OLAP systems have the memory partitioned and thus, this synchronization
does nothing
-        synchronized (memory) {
-            // these are traversers that are going from OLTP to OLAP
-            // these traversers were broadcasted from the master traversal to the workers
for attachment
-            final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
+        // most distributed OLAP systems have the memory partitioned and thus, this synchronization
does nothing.
+        synchronized (maybeActiveTraversers) {
             final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
             while (iterator.hasNext()) {
                 final Traverser.Admin<Object> traverser = iterator.next();
@@ -97,20 +94,19 @@ final class WorkerExecutor {
                 traverser.setSideEffects(traversalSideEffects);
                 toProcessTraversers.add(traverser);
             });
+            assert previousActiveTraversers.isEmpty();
+            // remove the property to save space
             vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
         });
         // these are traversers that have been messaged to the vertex from another vertex
         final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
         while (messages.hasNext()) {
             IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser
-> {
-                // this is internal testing to ensure that messaged elements are always ReferenceXXX
and not DetachedXXX (related to HaltedTraverserFactoryStrategy)
-                if (isTesting && !(messenger instanceof SingleMessenger) &&
traverser.get() instanceof Element)
-                    assert traverser.get() instanceof ReferenceElement;
                 if (traverser.isHalted()) {
                     if (returnHaltedTraversers)
                         memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser,
haltedTraverserFactory)));
                     else
-                        haltedTraversers.add(traverser);
+                        haltedTraversers.add(traverser.detach());
                 } else {
                     // traverser is not halted and thus, should be processed locally
                     traverser.attach(Attachable.Method.get(vertex));
@@ -132,6 +128,7 @@ final class WorkerExecutor {
                 final Traverser.Admin<Object> traverser = traversers.next();
                 traversers.remove();
                 final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
+                // try and fill up the current step as much as possible with traversers to
get a bulking optimization
                 if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep
instanceof EmptyStep))
                     WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers,
memory, returnHaltedTraversers, haltedTraverserFactory);
                 currentStep.addStart(traverser);
@@ -150,12 +147,11 @@ final class WorkerExecutor {
                     if (traverser.get() instanceof Element || traverser.get() instanceof
Property) {      // GRAPH OBJECT
                         // if the element is remote, then message, else store it locally
for re-processing
                         final Vertex hostingVertex = WorkerExecutor.getHostingVertex(traverser.get());
-                        if (!vertex.equals(hostingVertex)) { // necessary for path access
-                            voteToHalt.set(false);
+                        if (!vertex.equals(hostingVertex)) { // if its host is not the current
vertex, then send the traverser to the hosting vertex
+                            voteToHalt.set(false); // if message is passed, then don't vote
to halt
                             messenger.sendMessage(MessageScope.Global.of(hostingVertex),
new TraverserSet<>(traverser.detach()));
                         } else {
-                            if (traverser.get() instanceof Attachable)   // necessary for
path access to local object
-                                traverser.attach(Attachable.Method.get(vertex));
+                            traverser.attach(Attachable.Method.get(vertex));
                             toProcessTraversers.add(traverser);
                         }
                     } else                                                              
               // STANDARD OBJECT
@@ -195,7 +191,7 @@ final class WorkerExecutor {
                             else
                                 haltedTraversers.add(traverser.detach());
                         } else
-                            localBarrierTraversers.add(traverser);
+                            localBarrierTraversers.add(traverser.detach());
                     });
                 }
                 memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index d643cd4..d0f50d0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -109,7 +109,7 @@ import java.util.stream.Stream;
         test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
         method = "g_V_matchXa_0sungBy_b__a_0sungBy_c__b_writtenBy_d__c_writtenBy_e__d_hasXname_George_HarisonX__e_hasXname_Bob_MarleyXX",
         reason = "Hadoop-Gremlin is OLAP-oriented and for OLTP operations, linear-scan joins
are required. This particular tests takes many minutes to execute.",
-        computers = {"ALL"})
+        computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
 // this is a nasty long test, just do it once in Java MatchTest
 @Graph.OptOut(
         test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
         method = "g_V_matchXa_0sungBy_b__a_0writtenBy_c__b_writtenBy_d__c_sungBy_d__d_hasXname_GarciaXX",

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
index af0b3b7..43bc94e 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
@@ -23,10 +23,12 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorati
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPath;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdge;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferencePath;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceProperty;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProperty;
@@ -64,6 +66,7 @@ public class HaltedTraverserFactoryStrategyTest {
         g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(DetachedProperty.class,
property.getClass()));
         g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class,
value.getClass()));
         g.V().out().out().forEachRemaining(vertex -> assertEquals(DetachedVertex.class,
vertex.getClass()));
+        g.V().out().out().path().forEachRemaining(path -> assertEquals(DetachedPath.class,
path.getClass()));
     }
 
     @Test
@@ -77,7 +80,8 @@ public class HaltedTraverserFactoryStrategyTest {
         g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class,
property.getClass()));
         g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class,
value.getClass()));
         g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class,
vertex.getClass()));
-        //
+        g.V().out().out().path().forEachRemaining(path -> assertEquals(ReferencePath.class,
path.getClass()));
+        // the default should be reference elements
         g = graph.traversal().withComputer();
         g.V().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass()));
         g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(ReferenceVertexProperty.class,
vertexProperty.getClass()));
@@ -86,6 +90,7 @@ public class HaltedTraverserFactoryStrategyTest {
         g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class,
property.getClass()));
         g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class,
value.getClass()));
         g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class,
vertex.getClass()));
+        g.V().out().out().path().forEachRemaining(path -> assertEquals(ReferencePath.class,
path.getClass()));
     }
 
 }


Mime
View raw message