tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From spmalle...@apache.org
Subject [08/27] incubator-tinkerpop git commit: added traversal-based vote strength test to PeerPressureTest. Cleaned up internal classes of TraversalVertexProgram a bit for better organization. Generalized Phase enum so it can be used by other computer-based cl
Date Thu, 26 May 2016 17:50:34 GMT
added traversal-based vote strength test to PeerPressureTest. Cleaned up internal classes of TraversalVertexProgram a bit for better organization. Generalized Phase enum so it can be used by other computer-based classes. added comments to TraveralVertexProgram.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/5bcf0b01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/5bcf0b01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/5bcf0b01

Branch: refs/heads/TINKERPOP-1308
Commit: 5bcf0b01c7bfc77e9eea8fdb7b2c4c54a4fc0000
Parents: b100f03
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Thu May 19 05:23:36 2016 -0600
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Thu May 19 05:23:36 2016 -0600

----------------------------------------------------------------------
 docs/src/reference/the-traversal.asciidoc       |  17 +-
 .../gremlin/process/computer/ProgramPhase.java  |  40 ++++
 .../peerpressure/PeerPressureVertexProgram.java |  21 +-
 .../traversal/MemoryTraversalSideEffects.java   |  27 +--
 .../computer/traversal/SingleMessenger.java     |  49 -----
 .../traversal/TraversalVertexProgram.java       |  18 +-
 .../computer/traversal/TraverserExecutor.java   | 216 ------------------
 .../computer/traversal/WorkerExecutor.java      | 220 +++++++++++++++++++
 .../step/map/PeerPressureVertexProgramStep.java |   9 +-
 .../process/computer/util/EmptyMemory.java      |  58 ++++-
 .../process/computer/util/SingleMessenger.java  |  49 +++++
 .../step/map/GroovyPeerPressureTest.groovy      |   5 +
 .../traversal/step/map/PeerPressureTest.java    |  31 +++
 .../process/traversal/step/map/ProgramTest.java |  11 +-
 .../SparkStarBarrierInterceptor.java            |   3 +-
 15 files changed, 460 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/docs/src/reference/the-traversal.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/the-traversal.asciidoc b/docs/src/reference/the-traversal.asciidoc
index 3394e90..4c74795 100644
--- a/docs/src/reference/the-traversal.asciidoc
+++ b/docs/src/reference/the-traversal.asciidoc
@@ -1434,6 +1434,10 @@ vertex program includes:
 The user supplied `VertexProgram` can leverage that information accordingly within their vertex program. Example uses
 are provided below.
 
+WARNING: Developing a `VertexProgram` is for expert users. Moreover, developing one that can be used effectively within
+a traversal requires yet more expertise. This information is recommended to advanced users with a deep understanding of the
+mechanics of Gremlin OLAP (<<graphcomputer,`GraphComputer`>>).
+
 [source,java]
 ----
 private TraverserSet<Object> haltedTraversers;
@@ -1447,7 +1451,7 @@ public void loadState(final Graph graph, final Configuration configuration) {
   // if master-traversal traversers may be propagated, create a memory compute key
   this.memoryComputeKeys.add(MemoryComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, Operator.addAll, false, false));
   // returns an empty traverser set if there are no halted traversers
-  this.haltedTraversers = TraversalVertexProgram.getHaltedTraversers(configuration);
+  this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
 }
 
 public void storeState(final Configuration configuration) {
@@ -1457,22 +1461,21 @@ public void storeState(final Configuration configuration) {
 }
 
 public void setup(final Memory memory) {
-  if(null != this.haltedTraversers) {
+  if(!this.haltedTraversers.isEmpty()) {
     // do what you like with the halted master traversal traversers
   }
   // once used, no need to keep that information around (master)
-  if(null != this.haltedTraversers)
-    this.haltedTraversers.clear()
+  this.haltedTraversers = null;
 }
 
 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
   // once used, no need to keep that information around (workers)
   if(null != this.haltedTraversers)
-    this.haltedTraversers.clear()
+    this.haltedTraversers = null;
   if(vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent()) {
     // haltedTraversers in execute() represent worker-traversal traversers
     // for example, from a traversal of the form g.V().out().program(...)
-    TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS) :
+    TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
     // create a new halted traverser set that can be used by the next OLAP job in the chain
     // these are worker-traversers that are distributed throughout the graph
     TraverserSet<Object> newHaltedTraversers = new TraverserSet<>();
@@ -1480,7 +1483,7 @@ public void execute(final Vertex vertex, final Messenger messenger, final Memory
        newHaltedTraversers.add(traverser.split(traverser.get().toString(), this.programStep));
     });
     vertex.property(VertexProperty.Cardinality.single, TraversalVertexProgram.HALTED_TRAVERSERS, newHaltedTraversers);
-    // it is possible to create master-traversers that are localized to the master traversal (thread)
+    // it is possible to create master-traversers that are localized to the master traversal (this is how results are ultimately delivered back to the user)
     memory.add(TraversalVertexProgram.HALTED_TRAVERSERS,
                new TraverserSet<>(this.traversal().get().getTraverserGenerator().generate("an example", this.programStep, 1l)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ProgramPhase.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ProgramPhase.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ProgramPhase.java
new file mode 100644
index 0000000..ce39505
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ProgramPhase.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum ProgramPhase {
+
+    SETUP,
+    WORKER_ITERATION_START,
+    EXECUTE,
+    WORKER_ITERATION_END,
+    TERMINATE;
+
+    public boolean masterState() {
+        return this == SETUP || this == TERMINATE;
+    }
+
+    public boolean workerState() {
+        return !this.masterState();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
index 8834882..56de255 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.MapHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.util.ScriptTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -62,6 +63,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
 
     public static final String CLUSTER = "gremlin.peerPressureVertexProgram.cluster";
     private static final String VOTE_STRENGTH = "gremlin.peerPressureVertexProgram.voteStrength";
+    private static final String INITIAL_VOTE_STRENGTH_TRAVERSAL = "gremlin.pageRankVertexProgram.initialVoteStrengthTraversal";
     private static final String PROPERTY = "gremlin.peerPressureVertexProgram.property";
     private static final String MAX_ITERATIONS = "gremlin.peerPressureVertexProgram.maxIterations";
     private static final String DISTRIBUTE_VOTE = "gremlin.peerPressureVertexProgram.distributeVote";
@@ -69,6 +71,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
     private static final String VOTE_TO_HALT = "gremlin.peerPressureVertexProgram.voteToHalt";
 
     private PureTraversal<Vertex, Edge> edgeTraversal = null;
+    private PureTraversal<Vertex, ? extends Number> initialVoteStrengthTraversal = null;
     private int maxIterations = 30;
     private boolean distributeVote = false;
     private String property = CLUSTER;
@@ -81,6 +84,8 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
 
     @Override
     public void loadState(final Graph graph, final Configuration configuration) {
+        if (configuration.containsKey(INITIAL_VOTE_STRENGTH_TRAVERSAL))
+            this.initialVoteStrengthTraversal = PureTraversal.loadState(configuration, INITIAL_VOTE_STRENGTH_TRAVERSAL, graph);
         if (configuration.containsKey(EDGE_TRAVERSAL)) {
             this.edgeTraversal = PureTraversal.loadState(configuration, EDGE_TRAVERSAL, graph);
             this.voteScope = MessageScope.Local.of(() -> this.edgeTraversal.get().clone());
@@ -99,6 +104,8 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
         configuration.setProperty(DISTRIBUTE_VOTE, this.distributeVote);
         if (null != this.edgeTraversal)
             this.edgeTraversal.storeState(configuration, EDGE_TRAVERSAL);
+        if (null != this.initialVoteStrengthTraversal)
+            this.initialVoteStrengthTraversal.storeState(configuration, INITIAL_VOTE_STRENGTH_TRAVERSAL);
     }
 
     @Override
@@ -137,14 +144,19 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
             if (this.distributeVote) {
                 messenger.sendMessage(this.countScope, Pair.with('c', 1.0d));
             } else {
-                double voteStrength = 1.0d;
+                double voteStrength = (null == this.initialVoteStrengthTraversal ?
+                        1.0d :
+                        TraversalUtil.apply(vertex, this.initialVoteStrengthTraversal.get()).doubleValue());
                 vertex.property(VertexProperty.Cardinality.single, this.property, vertex.id());
                 vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
                 messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
                 memory.add(VOTE_TO_HALT, false);
             }
         } else if (1 == memory.getIteration() && this.distributeVote) {
-            double voteStrength = 1.0d / IteratorUtils.reduce(IteratorUtils.map(messenger.receiveMessages(), Pair::getValue1), 0.0d, (a, b) -> a + b);
+            double voteStrength = (null == this.initialVoteStrengthTraversal ?
+                    1.0d :
+                    TraversalUtil.apply(vertex, this.initialVoteStrengthTraversal.get()).doubleValue()) /
+                    IteratorUtils.reduce(IteratorUtils.map(messenger.receiveMessages(), Pair::getValue1), 0.0d, (a, b) -> a + b);
             vertex.property(VertexProperty.Cardinality.single, this.property, vertex.id());
             vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
             messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
@@ -227,6 +239,11 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
             return this;
         }
 
+        public Builder initialVoteStrength(final Traversal.Admin<Vertex, ? extends Number> initialVoteStrengthTraversal) {
+            PureTraversal.storeState(this.configuration, INITIAL_VOTE_STRENGTH_TRAVERSAL, initialVoteStrengthTraversal);
+            return this;
+        }
+
         /**
          * @deprecated As of release 3.2.0, replaced by {@link PeerPressureVertexProgram.Builder#edges(Traversal.Admin)}
          */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
index 23d33f1..bf9f8c0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 
@@ -38,23 +39,7 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
 
     private TraversalSideEffects sideEffects;
     private Memory memory;
-    private State state;
-
-    public enum State {
-        SETUP,
-        WORKER_ITERATION_START,
-        EXECUTE,
-        WORKER_ITERATION_END,
-        TERMINATE;
-
-        public boolean masterState() {
-            return this == SETUP || this == TERMINATE;
-        }
-
-        public boolean workerState() {
-            return !this.masterState();
-        }
-    }
+    private ProgramPhase phase;
 
     private MemoryTraversalSideEffects() {
         // for serialization
@@ -93,7 +78,7 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
 
     @Override
     public void add(final String key, final Object value) {
-        if (this.state.workerState())
+        if (this.phase.workerState())
             this.memory.add(key, value);
         else
             this.memory.set(key, this.sideEffects.getReducer(key).apply(this.memory.get(key), value));
@@ -168,20 +153,20 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
     }
 
     public void storeSideEffectsInMemory() {
-        if (this.state.workerState())
+        if (this.phase.workerState())
             this.sideEffects.forEach(this.memory::add);
         else
             this.sideEffects.forEach(this.memory::set);
     }
 
-    public static void setMemorySideEffects(final Traversal.Admin<?, ?> traversal, final Memory memory, final State state) {
+    public static void setMemorySideEffects(final Traversal.Admin<?, ?> traversal, final Memory memory, final ProgramPhase phase) {
         final TraversalSideEffects sideEffects = traversal.getSideEffects();
         if (!(sideEffects instanceof MemoryTraversalSideEffects)) {
             traversal.setSideEffects(new MemoryTraversalSideEffects(sideEffects));
         }
         final MemoryTraversalSideEffects memoryTraversalSideEffects = ((MemoryTraversalSideEffects) traversal.getSideEffects());
         memoryTraversalSideEffects.memory = memory;
-        memoryTraversalSideEffects.state = state;
+        memoryTraversalSideEffects.phase = phase;
     }
 
     public static Set<MemoryComputeKey> getMemoryComputeKeys(final Traversal.Admin<?, ?> traversal) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
deleted file mode 100644
index 26ed8a4..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.computer.traversal;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SingleMessenger<M> implements Messenger<M> {
-
-    private final Messenger<M> baseMessenger;
-    private final M message;
-
-    public SingleMessenger(final Messenger<M> baseMessenger, final M message) {
-        this.baseMessenger = baseMessenger;
-        this.message = message;
-    }
-
-    @Override
-    public Iterator<M> receiveMessages() {
-        return IteratorUtils.of(this.message);
-    }
-
-    @Override
-    public void sendMessage(final MessageScope messageScope, final M message) {
-        this.baseMessenger.sendMessage(messageScope, message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index c7e7ef9..266426f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -27,12 +27,14 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
 import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import org.apache.tinkerpop.gremlin.process.computer.util.SingleMessenger;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
@@ -195,13 +197,13 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void setup(final Memory memory) {
         // memory is local
-        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.SETUP);
+        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.SETUP);
         ((MemoryTraversalSideEffects) this.traversal.get().getSideEffects()).storeSideEffectsInMemory();
         memory.set(VOTE_TO_HALT, true);
         memory.set(MUTATED_MEMORY_KEYS, new HashSet<>());
         memory.set(COMPLETED_BARRIERS, new HashSet<>());
-        // if halted traversers are being sent from a previous VertexProgram in an OLAP chain (non-distributed traversers), get them into the stream
-        if (null != this.haltedTraversers && !this.haltedTraversers.isEmpty()) {
+        // if halted traversers are being sent from a previous VertexProgram in an OLAP chain (non-distributed traversers), get them into the flow
+        if (!this.haltedTraversers.isEmpty()) {
             final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
             IteratorUtils.removeOnNext(this.haltedTraversers.iterator()).forEachRemaining(traverser -> {
                 traverser.setStepId(this.traversal.get().getStartStep().getId());
@@ -231,7 +233,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         if (null != this.haltedTraversers)
             this.haltedTraversers = null;
         // memory is distributed
-        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.EXECUTE);
+        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.EXECUTE);
         // if a barrier was completed in another worker, it is also completed here (ensure distributed barriers are synchronized)
         final Set<String> completedBarriers = memory.get(COMPLETED_BARRIERS);
         for (final String stepId : completedBarriers) {
@@ -245,11 +247,13 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         //////////////////
         if (memory.isInitialIteration()) {    // ITERATION 1
             final TraverserSet<Object> activeTraversers = new TraverserSet<>();
+            // if halted traversers are being sent from a previous VertexProgram in an OLAP chain (distributed traversers), get them into the flow
             IteratorUtils.removeOnNext(haltedTraversers.iterator()).forEachRemaining(traverser -> {
                 traverser.setStepId(this.traversal.get().getStartStep().getId());
                 activeTraversers.add(traverser);
             });
             assert haltedTraversers.isEmpty();
+            // for g.V()/E()
             if (this.traversal.get().getStartStep() instanceof GraphStep) {
                 final GraphStep<Element, Element> graphStep = (GraphStep<Element, Element>) this.traversal.get().getStartStep();
                 graphStep.reset();
@@ -270,9 +274,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                         activeTraversers.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers));
+            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers));
         } else {  // ITERATION 1+
-            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers));
+            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers));
         }
         if (this.returnHaltedTraversers || haltedTraversers.isEmpty())
             vertex.<TraverserSet>property(HALTED_TRAVERSERS).remove();
@@ -281,7 +285,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public boolean terminate(final Memory memory) {
         // memory is local
-        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.TERMINATE);
+        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.TERMINATE);
         final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
         memory.set(VOTE_TO_HALT, true);
         memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
deleted file mode 100644
index c5f9bb5..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.computer.traversal;
-
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class TraverserExecutor {
-
-    public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<Object>> messenger, final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final boolean returnHaltedTraversers) {
-
-        final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
-        final AtomicBoolean voteToHalt = new AtomicBoolean(true);
-        final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
-        final TraverserSet<Object> activeTraversers = new TraverserSet<>();
-        final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
-
-        ////////////////////////////////
-        // GENERATE LOCAL TRAVERSERS //
-        ///////////////////////////////
-
-        // these are traversers that are going from OLTP to OLAP
-        final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
-        final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
-        while (iterator.hasNext()) {
-            final Traverser.Admin<Object> traverser = iterator.next();
-            if (vertex.equals(TraverserExecutor.getHostingVertex(traverser.get()))) {
-                // iterator.remove(); ConcurrentModificationException
-                traverser.attach(Attachable.Method.get(vertex));
-                traverser.setSideEffects(traversalSideEffects);
-                toProcessTraversers.add(traverser);
-            }
-        }
-        // these are traversers that exist from from a local barrier
-        vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
-            IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> {
-                traverser.attach(Attachable.Method.get(vertex));
-                traverser.setSideEffects(traversalSideEffects);
-                toProcessTraversers.add(traverser);
-            });
-            vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
-        });
-        // these are traversers that have been messaged to the vertex
-        final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
-        while (messages.hasNext()) {
-            final Iterator<Traverser.Admin<Object>> traversers = messages.next().iterator();
-            while (traversers.hasNext()) {
-                final Traverser.Admin<Object> traverser = traversers.next();
-                traversers.remove();
-                if (traverser.isHalted()) {
-                    if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
-                    else
-                        haltedTraversers.add(traverser);
-                } else {
-                    traverser.attach(Attachable.Method.get(vertex));
-                    traverser.setSideEffects(traversalSideEffects);
-                    toProcessTraversers.add(traverser);
-                }
-            }
-        }
-
-        ///////////////////////////////
-        // PROCESS LOCAL TRAVERSERS //
-        //////////////////////////////
-
-        // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
-        while (!toProcessTraversers.isEmpty()) {
-            // process local traversers and if alive, repeat, else halt.
-            Step<Object, Object> previousStep = EmptyStep.instance();
-            Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
-            while (traversers.hasNext()) {
-                final Traverser.Admin<Object> traverser = traversers.next();
-                traversers.remove();
-                final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
-                if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
-                    TraverserExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
-                currentStep.addStart(traverser);
-                previousStep = currentStep;
-            }
-            TraverserExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
-            assert toProcessTraversers.isEmpty();
-            // process all the local objects and send messages or store locally again
-            if (!activeTraversers.isEmpty()) {
-                traversers = activeTraversers.iterator();
-                while (traversers.hasNext()) {
-                    final Traverser.Admin<Object> traverser = traversers.next();
-                    traversers.remove();
-                    if (traverser.get() instanceof Element || traverser.get() instanceof Property) {      // GRAPH OBJECT
-                        // if the element is remote, then message, else store it locally for re-processing
-                        final Vertex hostingVertex = TraverserExecutor.getHostingVertex(traverser.get());
-                        if (!vertex.equals(hostingVertex)) { // necessary for path access
-                            voteToHalt.set(false);
-                            traverser.detach();
-                            messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser));
-                        } else {
-                            if (traverser.get() instanceof Attachable)   // necessary for path access to local object
-                                traverser.attach(Attachable.Method.get(vertex));
-                            toProcessTraversers.add(traverser);
-                        }
-                    } else                                                                              // STANDARD OBJECT
-                        toProcessTraversers.add(traverser);
-                }
-                assert activeTraversers.isEmpty();
-            }
-        }
-        return voteToHalt.get();
-    }
-
-    private static void drainStep(final Vertex vertex, final Step<Object, Object> step, final TraverserSet<Object> activeTraversers, final TraverserSet<Object> haltedTraversers, final Memory memory, final boolean returnHaltedTraversers) {
-        if (step instanceof Barrier) {
-            if (step instanceof Bypassing)
-                ((Bypassing) step).setBypass(true);
-            if (step instanceof LocalBarrier) {
-                final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step;
-                final TraverserSet<Object> traverserSet = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
-                vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, traverserSet);
-                while (barrier.hasNextBarrier()) {
-                    final TraverserSet<Object> barrierSet = barrier.nextBarrier();
-                    IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
-                        traverser.addLabels(step.getLabels());  // this might need to be generalized for working with global barriers too
-                        if (traverser.isHalted() &&
-                                (returnHaltedTraversers ||
-                                        (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
-                                        getHostingVertex(traverser.get()).equals(vertex))) {
-                            traverser.detach();
-                            if (returnHaltedTraversers)
-                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
-                            else
-                                haltedTraversers.add(traverser);
-                        } else {
-                            traverser.detach();
-                            traverserSet.add(traverser);
-                        }
-                    });
-                }
-                memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
-            } else {
-                final Barrier barrier = (Barrier) step;
-                while (barrier.hasNextBarrier()) {
-                    memory.add(step.getId(), barrier.nextBarrier());
-                }
-                memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
-            }
-        } else { // LOCAL PROCESSING
-            step.forEachRemaining(traverser -> {
-                if (traverser.isHalted() &&
-                        (returnHaltedTraversers ||
-                                (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
-                                getHostingVertex(traverser.get()).equals(vertex))) {
-                    traverser.detach();
-                    if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
-                    else
-                        haltedTraversers.add(traverser);
-                } else {
-                    activeTraversers.add(traverser);
-                }
-            });
-        }
-    }
-
-    private static Vertex getHostingVertex(final Object object) {
-        Object obj = object;
-        while (true) {
-            if (obj instanceof Vertex)
-                return (Vertex) obj;
-            else if (obj instanceof Edge)
-                return ((Edge) obj).outVertex();
-            else if (obj instanceof Property)
-                obj = ((Property) obj).element();
-            else
-                throw new IllegalStateException("The host of the object is unknown: " + obj.toString() + ':' + obj.getClass().getCanonicalName());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
new file mode 100644
index 0000000..c4bd659
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.traversal;
+
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class WorkerExecutor {
+
+    private WorkerExecutor() {
+
+    }
+
+    protected static boolean execute(final Vertex vertex, final Messenger<TraverserSet<Object>> messenger, final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final boolean returnHaltedTraversers) {
+
+        final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
+        final AtomicBoolean voteToHalt = new AtomicBoolean(true);
+        final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
+        final TraverserSet<Object> activeTraversers = new TraverserSet<>();
+        final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
+
+        ////////////////////////////////
+        // GENERATE LOCAL TRAVERSERS //
+        ///////////////////////////////
+
+        // these are traversers that are going from OLTP to OLAP
+        final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
+        final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
+        while (iterator.hasNext()) {
+            final Traverser.Admin<Object> traverser = iterator.next();
+            if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) {
+                // iterator.remove(); ConcurrentModificationException
+                traverser.attach(Attachable.Method.get(vertex));
+                traverser.setSideEffects(traversalSideEffects);
+                toProcessTraversers.add(traverser);
+            }
+        }
+        // these are traversers that exist from from a local barrier
+        vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
+            IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> {
+                traverser.attach(Attachable.Method.get(vertex));
+                traverser.setSideEffects(traversalSideEffects);
+                toProcessTraversers.add(traverser);
+            });
+            vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
+        });
+        // these are traversers that have been messaged to the vertex
+        final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
+        while (messages.hasNext()) {
+            final Iterator<Traverser.Admin<Object>> traversers = messages.next().iterator();
+            while (traversers.hasNext()) {
+                final Traverser.Admin<Object> traverser = traversers.next();
+                traversers.remove();
+                if (traverser.isHalted()) {
+                    if (returnHaltedTraversers)
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+                    else
+                        haltedTraversers.add(traverser);
+                } else {
+                    traverser.attach(Attachable.Method.get(vertex));
+                    traverser.setSideEffects(traversalSideEffects);
+                    toProcessTraversers.add(traverser);
+                }
+            }
+        }
+
+        ///////////////////////////////
+        // PROCESS LOCAL TRAVERSERS //
+        //////////////////////////////
+
+        // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
+        while (!toProcessTraversers.isEmpty()) {
+            // process local traversers and if alive, repeat, else halt.
+            Step<Object, Object> previousStep = EmptyStep.instance();
+            Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
+            while (traversers.hasNext()) {
+                final Traverser.Admin<Object> traverser = traversers.next();
+                traversers.remove();
+                final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
+                if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
+                    WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
+                currentStep.addStart(traverser);
+                previousStep = currentStep;
+            }
+            WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
+            assert toProcessTraversers.isEmpty();
+            // process all the local objects and send messages or store locally again
+            if (!activeTraversers.isEmpty()) {
+                traversers = activeTraversers.iterator();
+                while (traversers.hasNext()) {
+                    final Traverser.Admin<Object> traverser = traversers.next();
+                    traversers.remove();
+                    if (traverser.get() instanceof Element || traverser.get() instanceof Property) {      // GRAPH OBJECT
+                        // if the element is remote, then message, else store it locally for re-processing
+                        final Vertex hostingVertex = WorkerExecutor.getHostingVertex(traverser.get());
+                        if (!vertex.equals(hostingVertex)) { // necessary for path access
+                            voteToHalt.set(false);
+                            traverser.detach();
+                            messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser));
+                        } else {
+                            if (traverser.get() instanceof Attachable)   // necessary for path access to local object
+                                traverser.attach(Attachable.Method.get(vertex));
+                            toProcessTraversers.add(traverser);
+                        }
+                    } else                                                                              // STANDARD OBJECT
+                        toProcessTraversers.add(traverser);
+                }
+                assert activeTraversers.isEmpty();
+            }
+        }
+        return voteToHalt.get();
+    }
+
+    private static void drainStep(final Vertex vertex, final Step<Object, Object> step, final TraverserSet<Object> activeTraversers, final TraverserSet<Object> haltedTraversers, final Memory memory, final boolean returnHaltedTraversers) {
+        if (step instanceof Barrier) {
+            if (step instanceof Bypassing)
+                ((Bypassing) step).setBypass(true);
+            if (step instanceof LocalBarrier) {
+                final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step;
+                final TraverserSet<Object> traverserSet = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
+                vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, traverserSet);
+                while (barrier.hasNextBarrier()) {
+                    final TraverserSet<Object> barrierSet = barrier.nextBarrier();
+                    IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
+                        traverser.addLabels(step.getLabels());  // this might need to be generalized for working with global barriers too
+                        if (traverser.isHalted() &&
+                                (returnHaltedTraversers ||
+                                        (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
+                                        getHostingVertex(traverser.get()).equals(vertex))) {
+                            traverser.detach();
+                            if (returnHaltedTraversers)
+                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+                            else
+                                haltedTraversers.add(traverser);
+                        } else {
+                            traverser.detach();
+                            traverserSet.add(traverser);
+                        }
+                    });
+                }
+                memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
+            } else {
+                final Barrier barrier = (Barrier) step;
+                while (barrier.hasNextBarrier()) {
+                    memory.add(step.getId(), barrier.nextBarrier());
+                }
+                memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
+            }
+        } else { // LOCAL PROCESSING
+            step.forEachRemaining(traverser -> {
+                if (traverser.isHalted() &&
+                        (returnHaltedTraversers ||
+                                (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
+                                getHostingVertex(traverser.get()).equals(vertex))) {
+                    traverser.detach();
+                    if (returnHaltedTraversers)
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+                    else
+                        haltedTraversers.add(traverser);
+                } else {
+                    activeTraversers.add(traverser);
+                }
+            });
+        }
+    }
+
+    private static Vertex getHostingVertex(final Object object) {
+        Object obj = object;
+        while (true) {
+            if (obj instanceof Vertex)
+                return (Vertex) obj;
+            else if (obj instanceof Edge)
+                return ((Edge) obj).outVertex();
+            else if (obj instanceof Property)
+                obj = ((Property) obj).element();
+            else
+                throw new IllegalStateException("The host of the object is unknown: " + obj.toString() + ':' + obj.getClass().getCanonicalName());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
index 7bd726e..0ea5112 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.lambda.HaltedTraversersCountTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -87,11 +88,13 @@ public final class PeerPressureVertexProgramStep extends VertexProgramStep imple
     public PeerPressureVertexProgram generateProgram(final Graph graph, final Memory memory) {
         final Traversal.Admin<Vertex, Edge> detachedTraversal = this.edgeTraversal.getPure();
         detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()));
-        return PeerPressureVertexProgram.build()
+        final PeerPressureVertexProgram.Builder builder = PeerPressureVertexProgram.build()
                 .property(this.clusterProperty)
                 .maxIterations(this.times)
-                .edges(detachedTraversal)
-                .create(graph);
+                .edges(detachedTraversal);
+        if (this.previousTraversalVertexProgram())
+            builder.initialVoteStrength(new HaltedTraversersCountTraversal());
+        return builder.create(graph);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java
index 72b1bbf..f513ee6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java
@@ -21,19 +21,71 @@ package org.apache.tinkerpop.gremlin.process.computer.util;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 
+import java.util.Collections;
+import java.util.Set;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class EmptyMemory {
+public final class EmptyMemory implements Memory.Admin {
 
-    private static final Memory INSTANCE = new ImmutableMemory(new MapMemory());
+    private static final EmptyMemory INSTANCE = new EmptyMemory();
 
     private EmptyMemory() {
 
     }
 
-    public static Memory instance() {
+    public static EmptyMemory instance() {
         return INSTANCE;
     }
 
+    @Override
+    public void setIteration(final int iteration) {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public void setRuntime(final long runtime) {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public Memory asImmutable() {
+        return this;
+    }
+
+    @Override
+    public Set<String> keys() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public <R> R get(final String key) throws IllegalArgumentException {
+        throw Memory.Exceptions.memoryDoesNotExist(key);
+    }
+
+    @Override
+    public void set(final String key, final Object value) throws IllegalArgumentException, IllegalStateException {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public void add(final String key, final Object value) throws IllegalArgumentException, IllegalStateException {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public int getIteration() {
+        return 0;
+    }
+
+    @Override
+    public long getRuntime() {
+        return 0;
+    }
+
+    @Override
+    public boolean exists(final String key) {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/SingleMessenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/SingleMessenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/SingleMessenger.java
new file mode 100644
index 0000000..63ee282
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/SingleMessenger.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.util;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SingleMessenger<M> implements Messenger<M> {
+
+    private final Messenger<M> baseMessenger;
+    private final M message;
+
+    public SingleMessenger(final Messenger<M> baseMessenger, final M message) {
+        this.baseMessenger = baseMessenger;
+        this.message = message;
+    }
+
+    @Override
+    public Iterator<M> receiveMessages() {
+        return IteratorUtils.of(this.message);
+    }
+
+    @Override
+    public void sendMessage(final MessageScope messageScope, final M message) {
+        this.baseMessenger.sendMessage(messageScope, message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy
index eb1ad67..6ec0750 100644
--- a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy
+++ b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy
@@ -39,5 +39,10 @@ public abstract class GroovyPeerPressureTest {
         public Traversal<Vertex, Map<Object, Number>> get_g_V_peerPressure_byXclusterX_byXoutEXknowsXX_pageRankX1X_byXrankX_byXoutEXknowsXX_timesX2X_group_byXclusterX_byXrank_sumX_limitX100X() {
             new ScriptTraversal<>(g, "gremlin-groovy", "g.V.peerPressure.by('cluster').by(outE('knows')).pageRank(1.0).by('rank').by(outE('knows')).times(1).group.by('cluster').by(values('rank').sum).limit(100)")
         }
+
+        @Override
+        public Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX() {
+            new ScriptTraversal<>(g, "gremlin-groovy", "g.V.has('name', 'ripple').in('created').peerPressure.by(outE()).by('cluster').repeat(union(identity(), both())).times(2).dedup.valueMap('name', 'cluster')")
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java
index 21c4f43..5a2477c 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java
@@ -27,7 +27,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 import static org.junit.Assert.assertEquals;
@@ -43,6 +47,7 @@ public abstract class PeerPressureTest extends AbstractGremlinProcessTest {
 
     public abstract Traversal<Vertex, Map<Object, Number>> get_g_V_peerPressure_byXclusterX_byXoutEXknowsXX_pageRankX1X_byXrankX_byXoutEXknowsXX_timesX2X_group_byXclusterX_byXrank_sumX_limitX100X();
 
+    public abstract Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX();
 
     @Test
     @LoadGraphWith(MODERN)
@@ -72,6 +77,27 @@ public abstract class PeerPressureTest extends AbstractGremlinProcessTest {
         assertEquals(0.0d, (double) map.get(convertToVertexId("peter")), 0.001d);
     }
 
+    @Test
+    @LoadGraphWith(MODERN)
+    public void g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX() {
+        final Traversal<Vertex, Map<String, List<Object>>> traversal = get_g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX();
+        printTraversalForm(traversal);
+        final List<Map<String, List<Object>>> results = traversal.toList();
+        assertEquals(6, results.size());
+        final Map<String, Object> clusters = new HashMap<>();
+        results.forEach(m -> clusters.put((String) m.get("name").get(0), m.get("cluster").get(0)));
+        assertEquals(2, results.get(0).size());
+        assertEquals(6, clusters.size());
+        assertEquals(clusters.get("josh"), clusters.get("ripple"));
+        assertEquals(clusters.get("josh"), clusters.get("lop"));
+        final Set<Object> ids = new HashSet<>(clusters.values());
+        assertEquals(4, ids.size());
+        assertTrue(ids.contains(convertToVertexId("marko")));
+        assertTrue(ids.contains(convertToVertexId("vadas")));
+        assertTrue(ids.contains(convertToVertexId("josh")));
+        assertTrue(ids.contains(convertToVertexId("peter")));
+    }
+
 
     public static class Traversals extends PeerPressureTest {
 
@@ -84,5 +110,10 @@ public abstract class PeerPressureTest extends AbstractGremlinProcessTest {
         public Traversal<Vertex, Map<Object, Number>> get_g_V_peerPressure_byXclusterX_byXoutEXknowsXX_pageRankX1X_byXrankX_byXoutEXknowsXX_timesX2X_group_byXclusterX_byXrank_sumX_limitX100X() {
             return g.V().peerPressure().by("cluster").by(__.outE("knows")).pageRank(1.0d).by("rank").by(__.outE("knows")).times(1).<Object, Number>group().by("cluster").by(__.values("rank").sum()).limit(100);
         }
+
+        @Override
+        public Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX() {
+            return g.V().has("name", "ripple").in("created").peerPressure().by(__.outE()).by("cluster").repeat(__.union(__.identity(), __.both())).times(2).dedup().valueMap("name", "cluster");
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
index 8b2a293..5f548da 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
 import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
@@ -187,7 +188,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
 
         @Override
         public void setup(final Memory memory) {
-            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.SETUP);
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.SETUP);
             final Map<Vertex, Long> map = (Map<Vertex, Long>) this.haltedTraversers.iterator().next().get();
             assertEquals(2, map.size());
             assertTrue(map.values().contains(3l));
@@ -205,7 +206,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
         public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
             assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             final TraverserGenerator generator = this.traversal.get().getTraverserGenerator();
-            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.EXECUTE);
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.EXECUTE);
             this.checkSideEffects();
             final TraverserSet<Vertex> activeTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
             if (vertex.label().equals("software")) {
@@ -233,7 +234,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
         @Override
         public boolean terminate(final Memory memory) {
             final TraverserGenerator generator = this.traversal.get().getTraverserGenerator();
-            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.TERMINATE);
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.TERMINATE);
             checkSideEffects();
             if (memory.isInitialIteration()) {
                 assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
@@ -254,14 +255,14 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
             assertNotNull(this.haltedTraversers);
             this.haltedTraversers.clear();
             assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
-            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.WORKER_ITERATION_START);
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.WORKER_ITERATION_START);
             checkSideEffects();
         }
 
         @Override
         public void workerIterationEnd(final Memory memory) {
             assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
-            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.WORKER_ITERATION_END);
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.WORKER_ITERATION_END);
             checkSideEffects();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 6e35cf8..768d10a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -22,6 +22,7 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.o
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.NumberHelper;
@@ -75,7 +76,7 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
         traversal.applyStrategies();                                // compile
         boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
         ///////////////////////////////
-        MemoryTraversalSideEffects.setMemorySideEffects(traversal, memory, MemoryTraversalSideEffects.State.EXECUTE); // any intermediate sideEffect steps are backed by SparkMemory
+        MemoryTraversalSideEffects.setMemorySideEffects(traversal, memory, ProgramPhase.EXECUTE); // any intermediate sideEffect steps are backed by SparkMemory
         memory.setInExecute(true);
         final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
                 .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)


Mime
View raw message