tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject incubator-tinkerpop git commit: fixed HALTED_TRAVERSER bug where traversers were not at their 'resting location'. This only shows up in multi-OLAP job chains. HALTED_TRAVERESERS (master travesal) are now propagated via Configuration and NOT via sideEffec
Date Wed, 18 May 2016 22:15:59 GMT
Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1305 [created] 4ee302899


fixed HALTED_TRAVERSER bug where traversers were not at their 'resting location'. This only shows up in multi-OLAP job chains. HALTED_TRAVERESERS (master travesal) are now propagated via Configuration and NOT via sideEffects. This is more elegant and there are now helper methods in TraversalVertexProgram to make getting master halted traversers easy. Updated the-travesal.asciidoc to reflect how to use ProgramStep with it. Had to change a method signature in VertexComputing to account for this... its slight (breaking), but no one is using this right now, I'm sure of it. And if they are, the chaneg is trivial, just add a new argument to a method signature and ignore it to make it be like how it is in 3.2.0. ImmutablePath.TailPath was not serializable and this caused issues for HALTED_TRAVERSERS as well. Everything else fixed up.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4ee30289
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4ee30289
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4ee30289

Branch: refs/heads/TINKERPOP-1305
Commit: 4ee302899da37a0ad0ebab6bc4954ed39b69d6d9
Parents: c61095f
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed May 18 16:15:40 2016 -0600
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed May 18 16:15:40 2016 -0600

----------------------------------------------------------------------
 docs/src/reference/the-traversal.asciidoc       | 23 +++++++--
 .../process/computer/GiraphGraphComputer.java   |  6 +++
 .../traversal/MemoryTraversalSideEffects.java   |  1 -
 .../traversal/TraversalVertexProgram.java       | 50 +++++++++++++++++---
 .../computer/traversal/TraverserExecutor.java   | 21 +++++---
 .../traversal/step/VertexComputing.java         |  8 +++-
 .../step/map/PageRankVertexProgramStep.java     |  7 +--
 .../step/map/PeerPressureVertexProgramStep.java | 11 ++++-
 .../step/map/ProgramVertexProgramStep.java      | 10 +++-
 .../step/map/TraversalVertexProgramStep.java    | 13 +++--
 .../traversal/step/map/VertexProgramStep.java   | 21 +++-----
 .../optimization/GraphFilterStrategy.java       |  3 +-
 .../traversal/step/util/ImmutablePath.java      |  3 +-
 .../step/map/GroovyPageRankTest.groovy          |  5 ++
 .../traversal/step/map/PageRankTest.java        | 22 +++++++++
 .../process/traversal/step/map/ProgramTest.java | 38 ++++++++-------
 .../process/computer/SparkGraphComputer.java    |  7 ++-
 .../optimization/SparkInterceptorStrategy.java  |  5 +-
 .../SparkSingleIterationStrategy.java           |  4 +-
 19 files changed, 185 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/docs/src/reference/the-traversal.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/the-traversal.asciidoc b/docs/src/reference/the-traversal.asciidoc
index 71708e2..3394e90 100644
--- a/docs/src/reference/the-traversal.asciidoc
+++ b/docs/src/reference/the-traversal.asciidoc
@@ -1436,6 +1436,8 @@ are provided below.
 
 [source,java]
 ----
+private TraverserSet<Object> haltedTraversers;
+
 public void loadState(final Graph graph, final Configuration configuration) {
   VertexProgram.super.loadState(graph, configuration);
   this.traversal = PureTraversal.loadState(configuration, VertexProgramStep.ROOT_TRAVERSAL, graph);
@@ -1444,18 +1446,29 @@ public void loadState(final Graph graph, final Configuration configuration) {
   this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
   // if master-traversal traversers may be propagated, create a memory compute key
   this.memoryComputeKeys.add(MemoryComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, Operator.addAll, false, false));
+  // returns an empty traverser set if there are no halted traversers
+  this.haltedTraversers = TraversalVertexProgram.getHaltedTraversers(configuration);
+}
+
+public void storeState(final Configuration configuration) {
+  VertexProgram.super.storeState(configuration);
+  // if halted traversers is null or empty, it does nothing
+  TraversalVertexProgram.storeHaltedTraversers(configuration, this.haltedTraversers);
 }
 
 public void setup(final Memory memory) {
-  if(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
-    // haltedTraversers in setup() represent master-traversal traversers
-    // for example, from a traversal of the form g.V().groupCount().program(...)
-    TraverserSet<Object> haltedTraversers = sideEffects.get(TraversalVertexProgram.HALTED_TRAVERSERS);
-    // do as you please with this information
+  if(null != this.haltedTraversers) {
+    // do what you like with the halted master traversal traversers
   }
+  // once used, no need to keep that information around (master)
+  if(null != this.haltedTraversers)
+    this.haltedTraversers.clear()
 }
 
 public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
+  // once used, no need to keep that information around (workers)
+  if(null != this.haltedTraversers)
+    this.haltedTraversers.clear()
   if(vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent()) {
     // haltedTraversers in execute() represent worker-traversal traversers
     // for example, from a traversal of the form g.V().out().program(...)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 6bd2a04..012b9fc 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -58,9 +58,12 @@ import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
 import org.apache.tinkerpop.gremlin.util.Gremlin;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.File;
 import java.io.NotSerializableException;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
@@ -74,6 +77,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
     private MapMemory memory = new MapMemory();
     private boolean useWorkerThreadsInConfiguration;
+    private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
 
     public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
         super(hadoopGraph);
@@ -112,6 +116,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
         final BaseConfiguration apacheConfiguration = new BaseConfiguration();
         apacheConfiguration.setDelimiterParsingDisabled(true);
         vertexProgram.storeState(apacheConfiguration);
+        IteratorUtils.fill(apacheConfiguration.getKeys(), this.vertexProgramConfigurationKeys);
         ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, this.giraphConfiguration);
         this.vertexProgram.getMessageCombiner().ifPresent(combiner -> this.giraphConfiguration.setMessageCombinerClass(GiraphMessageCombiner.class));
         return this;
@@ -139,6 +144,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
             // clear properties that should not be propagated in an OLAP chain
             apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
             apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
+            this.vertexProgramConfigurationKeys.forEach(apacheConfiguration::clearProperty); // clear out vertex program specific configurations
             return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), this.memory.asImmutable());
         }, exec);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
index 1327a7f..23d33f1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
@@ -192,7 +192,6 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
                         traversal.getSideEffects();
         sideEffects.keys().
                 stream().
-                filter(key -> !key.equals(TraversalVertexProgram.HALTED_TRAVERSERS)).
                 forEach(key -> keys.add(MemoryComputeKey.of(key, sideEffects.getReducer(key), true, false)));
         return keys;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 4df5189..211a5e5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -33,6 +33,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.Computer
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -66,6 +67,7 @@ import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -102,6 +104,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     private PureTraversal<?, ?> traversal;
     private TraversalMatrix<?, ?> traversalMatrix;
     private final Set<MapReduce> mapReducers = new HashSet<>();
+    private TraverserSet<Object> haltedTraversers;
     private boolean returnHaltedTraversers = false;
 
     private TraversalVertexProgram() {
@@ -116,6 +119,32 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         return this.traversal;
     }
 
+    public static <R> TraverserSet<R> loadHaltedTraversers(final Configuration configuration) {
+        if (!configuration.containsKey(HALTED_TRAVERSERS))
+            return new TraverserSet<>();
+
+        final Object object = configuration.getProperty(HALTED_TRAVERSERS) instanceof String ?
+                VertexProgramHelper.deserialize(configuration, HALTED_TRAVERSERS) :
+                configuration.getProperty(HALTED_TRAVERSERS);
+        if (object instanceof Traverser.Admin)
+            return new TraverserSet<>((Traverser.Admin<R>) object);
+        else {
+            final TraverserSet<R> traverserSet = new TraverserSet<>();
+            traverserSet.addAll((Collection) object);
+            return traverserSet;
+        }
+    }
+
+    public static <R> void storeHaltedTraversers(final Configuration configuration, final TraverserSet<R> haltedTraversers) {
+        if (null != haltedTraversers && !haltedTraversers.isEmpty()) {
+            try {
+                VertexProgramHelper.serialize(haltedTraversers, configuration, HALTED_TRAVERSERS);
+            } catch (final Exception e) {
+                configuration.setProperty(HALTED_TRAVERSERS, haltedTraversers);
+            }
+        }
+    }
+
     @Override
     public void loadState(final Graph graph, final Configuration configuration) {
         if (!configuration.containsKey(TRAVERSAL))
@@ -125,6 +154,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             this.traversal.get().applyStrategies();
         /// traversal is compiled and ready to be introspected
         this.traversalMatrix = new TraversalMatrix<>(this.traversal.get());
+        // get any master-traversal halted traversers
+        this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
         // if results will be serialized out, don't save halted traversers across the cluster
         this.returnHaltedTraversers =
                 (this.traversal.get().getParent().asStep().getNextStep() instanceof ComputerResultStep || // if its just going to stream it out, don't distribute
@@ -158,6 +189,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     public void storeState(final Configuration configuration) {
         VertexProgram.super.storeState(configuration);
         this.traversal.storeState(configuration, TRAVERSAL);
+        TraversalVertexProgram.storeHaltedTraversers(configuration, this.haltedTraversers);
     }
 
     @Override
@@ -170,19 +202,17 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         memory.set(MUTATED_MEMORY_KEYS, new HashSet<>());
         memory.set(COMPLETED_BARRIERS, new HashSet<>());
         // if halted traversers are being sent from a previous VertexProgram in an OLAP chain (non-distributed traversers), get them into the stream
-        if (sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
-            final TraverserSet<Object> haltedTraversers = sideEffects.get(TraversalVertexProgram.HALTED_TRAVERSERS);
+        if (null != this.haltedTraversers) {
             final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
-            IteratorUtils.removeOnNext(haltedTraversers.iterator()).forEachRemaining(traverser -> {
+            IteratorUtils.removeOnNext(this.haltedTraversers.iterator()).forEachRemaining(traverser -> {
                 traverser.setStepId(this.traversal.get().getStartStep().getId());
                 toProcessTraversers.add(traverser);
             });
             assert haltedTraversers.isEmpty(); // TODO: this should be empty
             final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>();
-            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, haltedTraversers);
-            memory.set(HALTED_TRAVERSERS, haltedTraversers);
+            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, this.haltedTraversers);
+            memory.set(HALTED_TRAVERSERS, this.haltedTraversers);
             memory.set(ACTIVE_TRAVERSERS, remoteActiveTraversers);
-            sideEffects.remove(TraversalVertexProgram.HALTED_TRAVERSERS);
         } else {
             memory.set(HALTED_TRAVERSERS, new TraverserSet<>());
             memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>());
@@ -196,6 +226,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
 
     @Override
     public void execute(final Vertex vertex, final Messenger<TraverserSet<Object>> messenger, final Memory memory) {
+        // if any global halted traversers, simply don't use them as they were handled by master setup()
+        if (null != this.haltedTraversers)
+            this.haltedTraversers = null;
         // memory is distributed
         MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.EXECUTE);
         // if a barrier was completed in another worker, it is also completed here (ensure distributed barriers are synchronized)
@@ -365,6 +398,11 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             super(TraversalVertexProgram.class);
         }
 
+        public Builder haltedTraversers(final TraverserSet<Object> haltedTraversers) {
+            TraversalVertexProgram.storeHaltedTraversers(this.configuration, haltedTraversers);
+            return this;
+        }
+
         public Builder traversal(final TraversalSource traversalSource, final String scriptEngine, final String traversalScript, final Object... bindings) {
             return this.traversal(new ScriptTraversal<>(traversalSource, scriptEngine, traversalScript, bindings));
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 167924f..f5facdb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -87,9 +87,16 @@ public final class TraverserExecutor {
             while (traversers.hasNext()) {
                 final Traverser.Admin<Object> traverser = traversers.next();
                 traversers.remove();
-                traverser.attach(Attachable.Method.get(vertex));
-                traverser.setSideEffects(traversalSideEffects);
-                toProcessTraversers.add(traverser);
+                if (traverser.isHalted()) {
+                    if (returnHaltedTraversers)
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+                    else
+                        haltedTraversers.add(traverser);
+                } else {
+                    traverser.attach(Attachable.Method.get(vertex));
+                    traverser.setSideEffects(traversalSideEffects);
+                    toProcessTraversers.add(traverser);
+                }
             }
         }
 
@@ -152,10 +159,10 @@ public final class TraverserExecutor {
                     final TraverserSet<Object> barrierSet = barrier.nextBarrier();
                     IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
                         traverser.addLabels(step.getLabels());  // this might need to be generalized for working with global barriers too
-                        if (traverser.isHalted()) {
+                        if (traverser.isHalted() && ((!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) || getHostingVertex(traverser.get()).equals(vertex))) {
                             traverser.detach();
                             if (returnHaltedTraversers)
-                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser.split()));
+                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
                             else
                                 haltedTraversers.add(traverser);
                         } else {
@@ -174,10 +181,10 @@ public final class TraverserExecutor {
             }
         } else { // LOCAL PROCESSING
             step.forEachRemaining(traverser -> {
-                if (traverser.isHalted()) {
+                if (traverser.isHalted() && ((!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) || getHostingVertex(traverser.get()).equals(vertex))) {
                     traverser.detach();
                     if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser.split()));
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
                     else
                         haltedTraversers.add(traverser);
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
index 57e1a3e..b599c42 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/VertexComputing.java
@@ -21,9 +21,12 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.step;
 
 import org.apache.tinkerpop.gremlin.process.computer.Computer;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 
+import java.util.Optional;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -48,10 +51,11 @@ public interface VertexComputing {
     /**
      * Generate the {@link VertexProgram}.
      *
-     * @param graph the {@link Graph} that the program will be executed over.
+     * @param graph          the {@link Graph} that the program will be executed over.
+     * @param memoryOptional a reference to a {@link Memory} from the previous OLAP job if it exists.
      * @return the generated vertex program instance.
      */
-    public VertexProgram generateProgram(final Graph graph);
+    public VertexProgram generateProgram(final Graph graph, final Optional<Memory> memoryOptional);
 
     /**
      * @deprecated As of release 3.2.1. Please use {@link VertexComputing#getComputer()}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
index 5d10e67..660f5b8 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
@@ -19,8 +19,9 @@
 
 package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.lambda.HaltedTraversersCountTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
@@ -37,8 +38,8 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -83,7 +84,7 @@ public final class PageRankVertexProgramStep extends VertexProgramStep implement
     }
 
     @Override
-    public PageRankVertexProgram generateProgram(final Graph graph) {
+    public PageRankVertexProgram generateProgram(final Graph graph, final Optional<Memory> memoryOptional) {
         final Traversal.Admin<Vertex, Edge> detachedTraversal = this.edgeTraversal.getPure();
         detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()));
         final PageRankVertexProgram.Builder builder = PageRankVertexProgram.build()

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
index f42ce93..2ae36a5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
@@ -19,7 +19,7 @@
 
 package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -84,9 +85,15 @@ public final class PeerPressureVertexProgramStep extends VertexProgramStep imple
     }
 
     @Override
-    public PeerPressureVertexProgram generateProgram(final Graph graph) {
+    public PeerPressureVertexProgram generateProgram(final Graph graph, final Optional<Memory> memoryOptional) {
         final Traversal.Admin<Vertex, Edge> detachedTraversal = this.edgeTraversal.getPure();
         detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()));
+        /*
+                memoryOptional.ifPresent(memory -> {
+            if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS))
+                builder.configure(TraversalVertexProgram.HALTED_TRAVERSERS, memory.get(TraversalVertexProgram.HALTED_TRAVERSERS));
+        });
+         */
         return PeerPressureVertexProgram.build()
                 .property(this.clusterProperty)
                 .maxIterations(this.times)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
index a1342b6..d060c99 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
@@ -20,7 +20,9 @@
 package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
 import org.apache.commons.configuration.MapConfiguration;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -29,6 +31,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -48,11 +51,16 @@ public final class ProgramVertexProgramStep extends VertexProgramStep {
     }
 
     @Override
-    public VertexProgram generateProgram(final Graph graph) {
+    public VertexProgram generateProgram(final Graph graph, final Optional<Memory> memoryOptional) {
         final MapConfiguration base = new MapConfiguration(this.configuration);
         base.setDelimiterParsingDisabled(true);
         PureTraversal.storeState(base, ROOT_TRAVERSAL, TraversalHelper.getRootTraversal(this.getTraversal()).clone());
         base.setProperty(STEP_ID, this.getId());
+        memoryOptional.ifPresent(memory -> {
+            if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
+                TraversalVertexProgram.storeHaltedTraversers(base, memory.get(TraversalVertexProgram.HALTED_TRAVERSERS));
+            }
+        });
         return VertexProgram.createVertexProgram(graph, base);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
index 64b9097..5851f35 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
@@ -20,6 +20,7 @@
 package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -32,6 +33,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -62,15 +64,18 @@ public final class TraversalVertexProgramStep extends VertexProgramStep implemen
     }
 
     @Override
-    public TraversalVertexProgram generateProgram(final Graph graph) {
+    public TraversalVertexProgram generateProgram(final Graph graph, final Optional<Memory> memoryOptional) {
         final Traversal.Admin<?, ?> computerSpecificTraversal = this.computerTraversal.getPure();
         computerSpecificTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()).clone());
         this.getTraversal().getStrategies().toList().forEach(computerSpecificTraversal.getStrategies()::addStrategies);
         computerSpecificTraversal.setSideEffects(new MemoryTraversalSideEffects(this.getTraversal().getSideEffects()));
         computerSpecificTraversal.setParent(this);
-        return TraversalVertexProgram.build()
-                .traversal(computerSpecificTraversal)
-                .create(graph);
+        final TraversalVertexProgram.Builder builder = TraversalVertexProgram.build().traversal(computerSpecificTraversal);
+        memoryOptional.ifPresent(memory -> {
+            if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS))
+                builder.haltedTraversers(memory.get(TraversalVertexProgram.HALTED_TRAVERSERS));
+        });
+        return builder.create(graph);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index c535d30..ac3de30 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -25,7 +25,6 @@ import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.VertexComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
@@ -33,12 +32,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
 
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -66,14 +64,15 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
             if (this.first && this.getPreviousStep() instanceof EmptyStep) {
                 this.first = false;
                 final Graph graph = this.getTraversal().getGraph().get();
-                future = this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+                future = this.generateComputer(graph).program(this.generateProgram(graph, Optional.empty())).submit();
                 final ComputerResult result = future.get();
                 this.processMemorySideEffects(result.memory());
                 return this.getTraversal().getTraverserGenerator().generate(result, this, 1l);
             } else {
                 final Traverser.Admin<ComputerResult> traverser = this.starts.next();
                 final Graph graph = traverser.get().graph();
-                future = this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+                final Memory memory = traverser.get().memory();
+                future = this.generateComputer(graph).program(this.generateProgram(graph, Optional.of(memory))).submit();
                 final ComputerResult result = future.get();
                 this.processMemorySideEffects(result.memory());
                 return traverser.split(result, this);
@@ -122,21 +121,13 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
         final TraversalSideEffects sideEffects = this.getTraversal().getSideEffects();
         for (final String key : memory.keys()) {
             if (sideEffects.exists(key)) {
+                // halted traversers should never be propagated through sideEffects
+                assert !key.equals(TraversalVertexProgram.HALTED_TRAVERSERS);
                 sideEffects.set(key, memory.get(key));
             }
         }
-        if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS) && !this.isEndStep()) {
-            final TraverserSet<Object> haltedTraversers = memory.get(TraversalVertexProgram.HALTED_TRAVERSERS);
-            if (!haltedTraversers.isEmpty()) {
-                if (sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS))
-                    sideEffects.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
-                else
-                    sideEffects.register(TraversalVertexProgram.HALTED_TRAVERSERS, new ConstantSupplier<>(haltedTraversers), Operator.addAll);
-            }
-        }
     }
 
-
     protected boolean isEndStep() {
         return this.getNextStep() instanceof ComputerResultStep || (this.getNextStep() instanceof ProfileStep && this.getNextStep().getNextStep() instanceof ComputerResultStep);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
index 90819d7..e4cf5ff 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/GraphFilterStrategy.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -59,7 +60,7 @@ public final class GraphFilterStrategy extends AbstractTraversalStrategy<Travers
             return;
         final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // given that this strategy only works for single OLAP jobs, the graph is the traversal graph
         for (final TraversalVertexProgramStep step : TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal)) {   // will be zero or one step
-            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph).getTraversal().get().clone();
+            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, Optional.empty()).getTraversal().get().clone();
             if (!computerTraversal.isLocked())
                 computerTraversal.applyStrategies();
             final Computer computer = step.getComputer();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
index 1a916ca..1e936c8 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ImmutablePath.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.util;
 
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
 import org.apache.tinkerpop.gremlin.process.traversal.Pop;
-import org.apache.tinkerpop.gremlin.structure.Element;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -177,7 +176,7 @@ public class ImmutablePath implements Path, ImmutablePathImpl, Serializable, Clo
     }
 
 
-    private static class TailPath implements Path, ImmutablePathImpl {
+    private static class TailPath implements Path, ImmutablePathImpl, Serializable {
         private static final TailPath INSTANCE = new TailPath();
 
         private TailPath() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
index bda6819..e27e8c0 100644
--- a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
+++ b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPageRankTest.groovy
@@ -69,5 +69,10 @@ public abstract class GroovyPageRankTest {
         public Traversal<Vertex, Map<Object, List<Vertex>>> get_g_V_outXcreatedX_groupXmX_byXlabelX_pageRankX1X_byXpageRankX_byXinEX_timesX1X_inXcreatedX_groupXmX_byXpageRankX_capXmX() {
             new ScriptTraversal<>(g, "gremlin-groovy", "g.V.out('created').group('m').by(label).pageRank(1.0).by('pageRank').by(inE()).times(1).in('created').group('m').by('pageRank').cap('m')")
         }
+
+        @Override
+        public Traversal<Vertex, Map<String, Object>> get_g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX() {
+            new ScriptTraversal<>(g, "gremlin-groovy", "g.V.out('created').pageRank().by(bothE()).by('projectRank').valueMap('name','projectRank')")
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
index df4b802..5a3ed9b 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PageRankTest.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -48,6 +49,8 @@ public abstract class PageRankTest extends AbstractGremlinProcessTest {
 
     public abstract Traversal<Vertex, Vertex> get_g_V_pageRank();
 
+    public abstract Traversal<Vertex, Map<String, Object>> get_g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX();
+
     public abstract Traversal<Vertex, String> get_g_V_pageRank_order_byXpageRank_decrX_name();
 
     public abstract Traversal<Vertex, String> get_g_V_pageRank_order_byXpageRank_decrX_name_limitX2X();
@@ -78,6 +81,20 @@ public abstract class PageRankTest extends AbstractGremlinProcessTest {
 
     @Test
     @LoadGraphWith(MODERN)
+    public void g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX() {
+        final Traversal<Vertex, Map<String, Object>> traversal = get_g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX();
+        printTraversalForm(traversal);
+        final Map<String, Double> map = new HashMap<>();
+        traversal.forEachRemaining(m -> map.put(((List<String>) m.get("name")).get(0), ((List<Double>) m.get("projectRank")).get(0)));
+        assertEquals(2, map.size());
+        assertTrue(map.containsKey("lop"));
+        assertTrue(map.containsKey("ripple"));
+        assertTrue(map.get("lop") > map.get("ripple"));
+        assertFalse(traversal.hasNext());
+    }
+
+    @Test
+    @LoadGraphWith(MODERN)
     public void g_V_pageRank_order_byXpageRank_decrX_name() {
         final Traversal<Vertex, String> traversal = get_g_V_pageRank_order_byXpageRank_decrX_name();
         printTraversalForm(traversal);
@@ -225,6 +242,11 @@ public abstract class PageRankTest extends AbstractGremlinProcessTest {
         }
 
         @Override
+        public Traversal<Vertex, Map<String, Object>> get_g_V_outXcreatedX_pageRank_byXbothEX_byXprojectRankX_valueMapXname_projectRankX() {
+            return g.V().out("created").pageRank().by(__.bothE()).by("projectRank").valueMap("name", "projectRank");
+        }
+
+        @Override
         public Traversal<Vertex, Map<String, List<Object>>> get_g_V_pageRank_byXoutEXknowsXX_byXfriendRankX_valueMapXname_friendRankX() {
             return g.V().pageRank().by(__.outE("knows")).by("friendRank").valueMap("name", "friendRank");
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
index 2c505e7..8b2a293 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
@@ -59,6 +59,7 @@ import java.util.Set;
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -160,6 +161,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
     public static class TestProgram implements VertexProgram {
 
         private PureTraversal<?, ?> traversal = new PureTraversal<>(EmptyTraversal.instance());
+        private TraverserSet<Object> haltedTraversers;
         private Step programStep = EmptyStep.instance();
 
         private final Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
@@ -168,6 +170,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
         public void loadState(final Graph graph, final Configuration configuration) {
             VertexProgram.super.loadState(graph, configuration);
             this.traversal = PureTraversal.loadState(configuration, VertexProgramStep.ROOT_TRAVERSAL, graph);
+            this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
             this.programStep = new TraversalMatrix<>(this.traversal.get()).getStepById(configuration.getString(ProgramVertexProgramStep.STEP_ID));
             this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
             this.memoryComputeKeys.add(MemoryComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, Operator.addAll, false, false));
@@ -178,24 +181,24 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
         public void storeState(final Configuration configuration) {
             VertexProgram.super.storeState(configuration);
             this.traversal.storeState(configuration, VertexProgramStep.ROOT_TRAVERSAL);
+            TraversalVertexProgram.storeHaltedTraversers(configuration, this.haltedTraversers);
             configuration.setProperty(ProgramVertexProgramStep.STEP_ID, this.programStep.getId());
         }
 
         @Override
         public void setup(final Memory memory) {
             MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.SETUP);
-            final TraversalSideEffects sideEffects = this.traversal.get().getSideEffects();
-            final TraverserSet<Object> haltedTraversers = sideEffects.get(TraversalVertexProgram.HALTED_TRAVERSERS);
-            sideEffects.remove(TraversalVertexProgram.HALTED_TRAVERSERS);
-            this.checkSideEffects(MemoryTraversalSideEffects.State.SETUP);
-            final Map<Vertex, Long> map = (Map<Vertex, Long>) haltedTraversers.iterator().next().get();
+            final Map<Vertex, Long> map = (Map<Vertex, Long>) this.haltedTraversers.iterator().next().get();
             assertEquals(2, map.size());
             assertTrue(map.values().contains(3l));
             assertTrue(map.values().contains(1l));
             final TraverserSet<Object> activeTraversers = new TraverserSet<>();
-            map.keySet().forEach(vertex -> activeTraversers.add(haltedTraversers.peek().split(vertex, EmptyStep.instance())));
+            map.keySet().forEach(vertex -> activeTraversers.add(this.haltedTraversers.peek().split(vertex, EmptyStep.instance())));
+            this.haltedTraversers.clear();
+            this.checkSideEffects();
             memory.set(TraversalVertexProgram.ACTIVE_TRAVERSERS, activeTraversers);
 
+
         }
 
         @Override
@@ -203,7 +206,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
             assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             final TraverserGenerator generator = this.traversal.get().getTraverserGenerator();
             MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.EXECUTE);
-            this.checkSideEffects(MemoryTraversalSideEffects.State.EXECUTE);
+            this.checkSideEffects();
             final TraverserSet<Vertex> activeTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
             if (vertex.label().equals("software")) {
                 assertEquals(1, activeTraversers.stream().filter(v -> v.get().equals(vertex)).count());
@@ -231,7 +234,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
         public boolean terminate(final Memory memory) {
             final TraverserGenerator generator = this.traversal.get().getTraverserGenerator();
             MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.TERMINATE);
-            checkSideEffects(MemoryTraversalSideEffects.State.TERMINATE);
+            checkSideEffects();
             if (memory.isInitialIteration()) {
                 assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
                 return false;
@@ -248,16 +251,18 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
 
         @Override
         public void workerIterationStart(final Memory memory) {
+            assertNotNull(this.haltedTraversers);
+            this.haltedTraversers.clear();
             assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.WORKER_ITERATION_START);
-            checkSideEffects(MemoryTraversalSideEffects.State.WORKER_ITERATION_START);
+            checkSideEffects();
         }
 
         @Override
         public void workerIterationEnd(final Memory memory) {
             assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.WORKER_ITERATION_END);
-            checkSideEffects(MemoryTraversalSideEffects.State.WORKER_ITERATION_END);
+            checkSideEffects();
         }
 
         @Override
@@ -299,16 +304,13 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
 
         ////////
 
-        private void checkSideEffects(final MemoryTraversalSideEffects.State state) {
+        private void checkSideEffects() {
+            assertEquals(0, this.haltedTraversers.size());
+            assertTrue(this.haltedTraversers.isEmpty());
             final TraversalSideEffects sideEffects = this.traversal.get().getSideEffects();
             assertTrue(sideEffects instanceof MemoryTraversalSideEffects);
-            if (state.masterState()) {
-                assertEquals(1, sideEffects.keys().size());
-                assertFalse(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
-            } else {
-                assertEquals(2, sideEffects.keys().size());
-                assertTrue(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
-            }
+            assertEquals(1, sideEffects.keys().size());
+            assertFalse(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
             assertTrue(sideEffects.exists("x"));
             final BulkSet<String> bulkSet = sideEffects.get("x");
             assertEquals(4, bulkSet.size());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 9a9e934..9e05e53 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -264,14 +264,13 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                             throw new IllegalStateException(e.getMessage());
                         }
                     } else {  // standard GraphComputer semantics
+                        // get a configuration that will be propagated to all workers
+                        final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
+                        this.vertexProgram.storeState(vertexProgramConfiguration);
                         // set up the vertex program and wire up configurations
                         this.vertexProgram.setup(memory);
                         JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
                         memory.broadcastMemory(sparkContext);
-                        final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
-                        this.vertexProgram.storeState(vertexProgramConfiguration);
-                        ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
-                        ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
                         // execute the vertex program
                         while (true) {
                             if (Thread.interrupted()) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
index 19d21b3..a2cc518 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
@@ -20,6 +20,7 @@
 package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization;
 
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
@@ -29,6 +30,8 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.op
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
 
+import java.util.Optional;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -43,7 +46,7 @@ public final class SparkInterceptorStrategy extends AbstractTraversalStrategy<Tr
     public void apply(final Traversal.Admin<?, ?> traversal) {
         final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined
         for (final TraversalVertexProgramStep step : TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal)) {
-            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph).getTraversal().get().clone();
+            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, Optional.empty()).getTraversal().get().clone();
             if (!computerTraversal.isLocked())
                 computerTraversal.applyStrategies();
             if (SparkStarBarrierInterceptor.isLegal(computerTraversal)) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ee30289/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
index 153282e..280c7ba 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
@@ -20,6 +20,7 @@
 package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization;
 
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -36,6 +37,7 @@ import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
 
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -59,7 +61,7 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
     public void apply(final Traversal.Admin<?, ?> traversal) {
         final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined
         for (final TraversalVertexProgramStep step : TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal)) {
-            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph).getTraversal().get().clone();
+            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, Optional.empty()).getTraversal().get().clone();
             if (!computerTraversal.isLocked())
                 computerTraversal.applyStrategies();
             boolean doesMessagePass = TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, MULTI_ITERATION_CLASSES, computerTraversal);


Mime
View raw message