tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From spmalle...@apache.org
Subject [01/16] incubator-tinkerpop git commit: Wrote the most insane test in ProgramTest. It verifies that both master- and worker-haltedTraversers are accessible within a program(), that program().as(x) works, that sideEffects and labeled paths are accessible [Forced Update!]
Date Wed, 18 May 2016 13:11:08 GMT
Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1274 4ab69c301 -> f5b64fafb (forced update)


Wrote the most insane test in ProgramTest. It verifies that both master- and worker-haltedTraversers are accessible within a program(), that program().as(x) works, that sideEffects and labeled paths are accessible across OLAP jobs that include program(). Basically, epic. Had to work on TraversalVertexProgram a bunch as master-haltedTraversers were not being handled as expected in multi-OLAP chain. Lots of good helper methods added to make it easy for people writing VertexPrograms for use in program() to leverage the traversal. Its still pretty painful as you have to be really aware of how OLAP traversals work… I don't think I can make it much simpler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/39d709d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/39d709d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/39d709d3

Branch: refs/heads/TINKERPOP-1274
Commit: 39d709d39e900ad2d8246d519ca13640c70610b1
Parents: 2cafe68
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed May 11 17:04:58 2016 -0600
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed May 11 17:04:58 2016 -0600

----------------------------------------------------------------------
 .../computer/traversal/MasterExecutor.java      | 158 ++++++++++++++++
 .../traversal/MemoryTraversalSideEffects.java   |  43 ++++-
 .../traversal/TraversalVertexProgram.java       | 147 +++------------
 .../step/map/ProgramVertexProgramStep.java      |   8 +-
 .../traversal/step/map/VertexProgramStep.java   |  17 +-
 .../decoration/VertexProgramStrategy.java       |   3 +-
 .../util/DefaultTraversalSideEffects.java       |   1 +
 .../traversal/step/map/GroovyProgramTest.groovy |   5 +
 .../process/traversal/step/map/ProgramTest.java | 180 +++++++++++++++++++
 .../SparkStarBarrierInterceptor.java            |  10 +-
 10 files changed, 435 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
new file mode 100644
index 0000000..ac0fc29
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal;
+
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.traversal.Path;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.IdStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.LabelStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyKeyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyValueStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SackStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MasterExecutor {
+
+    private MasterExecutor() {
+
+    }
+
+    protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> traverserSet, final Set<String> completedBarriers) {
+        if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
+            for (final String key : memory.<Set<String>>get(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
+                final Step<Object, Object> step = traversalMatrix.getStepById(key);
+                if (null == step) continue;
+                assert step instanceof Barrier;
+                completedBarriers.add(step.getId());
+                if (!(step instanceof LocalBarrier)) {  // local barriers don't do any processing on the master traversal (they just lock on the workers)
+                    final Barrier<Object> barrier = (Barrier<Object>) step;
+                    barrier.addBarrier(memory.get(key));
+                    while (step.hasNext()) {
+                        traverserSet.add(step.next());
+                    }
+                    if (step instanceof ReducingBarrierStep)
+                        memory.set(step.getId(), ((ReducingBarrierStep) step).getSeedSupplier().get());
+                }
+            }
+        }
+        memory.set(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>());
+    }
+
+    protected static void processTraversers(final PureTraversal<?, ?> traversal,
+                                            final TraversalMatrix<?, ?> traversalMatrix,
+                                            TraverserSet<Object> toProcessTraversers,
+                                            final TraverserSet<Object> remoteActiveTraversers,
+                                            final TraverserSet<Object> haltedTraversers) {
+
+
+        while (!toProcessTraversers.isEmpty()) {
+            final TraverserSet<Object> localActiveTraversers = new TraverserSet<>();
+            Step<Object, Object> previousStep = EmptyStep.instance();
+            Step<Object, Object> currentStep = EmptyStep.instance();
+
+            final Iterator<Traverser.Admin<Object>> traversers = IteratorUtils.removeOnNext(toProcessTraversers.iterator());
+            while (traversers.hasNext()) {
+                final Traverser.Admin<Object> traverser = traversers.next();
+                traverser.set(DetachedFactory.detach(traverser.get(), true));
+                traverser.setSideEffects(traversal.get().getSideEffects());
+                if (traverser.isHalted()) {
+                    traverser.detach();
+                    haltedTraversers.add(traverser);
+                } else if (isRemoteTraverser(traverser, traversalMatrix)) {  // this is so that patterns like order().name work as expected.
+                    traverser.detach();
+                    remoteActiveTraversers.add(traverser);
+                } else {
+                    currentStep = traversalMatrix.getStepById(traverser.getStepId());
+                    if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep)) {
+                        while (previousStep.hasNext()) {
+                            final Traverser.Admin<Object> result = previousStep.next();
+                            if (result.isHalted()) {
+                                result.detach();
+                                haltedTraversers.add(result);
+                            } else {
+                                if (isRemoteTraverser(result, traversalMatrix)) {
+                                    result.detach();
+                                    remoteActiveTraversers.add(result);
+                                } else
+                                    localActiveTraversers.add(result);
+                            }
+                        }
+                    }
+                    currentStep.addStart(traverser);
+                    previousStep = currentStep;
+                }
+            }
+            if (!(currentStep instanceof EmptyStep)) {
+                while (currentStep.hasNext()) {
+                    final Traverser.Admin<Object> traverser = currentStep.next();
+                    if (traverser.isHalted()) {
+                        traverser.detach();
+                        haltedTraversers.add(traverser);
+                    } else {
+                        if (isRemoteTraverser(traverser, traversalMatrix)) {
+                            traverser.detach();
+                            remoteActiveTraversers.add(traverser);
+                        } else
+                            localActiveTraversers.add(traverser);
+                    }
+                }
+            }
+            assert toProcessTraversers.isEmpty();
+            toProcessTraversers = localActiveTraversers;
+        }
+    }
+
+    private static boolean isRemoteTraverser(final Traverser.Admin traverser, final TraversalMatrix<?, ?> traversalMatrix) {
+        return traverser.get() instanceof Attachable &&
+                !(traverser.get() instanceof Path) &&
+                !isLocalElement(traversalMatrix.getStepById(traverser.getStepId()));
+    }
+
+    // TODO: once this is complete (fully known), move to TraversalHelper
+    private static boolean isLocalElement(final Step<?, ?> step) {
+        return step instanceof PropertiesStep || step instanceof PropertyMapStep ||
+                step instanceof IdStep || step instanceof LabelStep || step instanceof SackStep ||
+                step instanceof PropertyKeyStep || step instanceof PropertyValueStep ||
+                step instanceof TailGlobalStep || step instanceof RangeGlobalStep || step instanceof HasStep;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
index dd86458..89f4f5c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
@@ -20,8 +20,11 @@
 package org.apache.tinkerpop.gremlin.process.computer.traversal;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 
+import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.BinaryOperator;
@@ -35,7 +38,7 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
 
     private TraversalSideEffects sideEffects;
     private Memory memory;
-    private boolean onExecute;
+    private boolean worker;
 
     private MemoryTraversalSideEffects() {
         // for serialization
@@ -46,11 +49,6 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
         this.memory = null;
     }
 
-    public void setMemory(final Memory memory, final boolean onExecute) {
-        this.memory = memory;
-        this.onExecute = onExecute;
-    }
-
     public TraversalSideEffects getSideEffects() {
         return this.sideEffects;
     }
@@ -64,7 +62,7 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
 
     @Override
     public <V> V get(final String key) throws IllegalArgumentException {
-        return null == this.memory ? this.sideEffects.get(key) : this.memory.get(key);
+        return (null != this.memory && this.memory.exists(key)) ? this.memory.get(key) : this.sideEffects.get(key);
     }
 
     @Override
@@ -79,7 +77,7 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
 
     @Override
     public void add(final String key, final Object value) {
-        if (this.onExecute)
+        if (this.worker)
             this.memory.add(key, value);
         else
             this.memory.set(key, this.sideEffects.getReducer(key).apply(this.memory.get(key), value));
@@ -152,4 +150,33 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
     public void mergeInto(final TraversalSideEffects sideEffects) {
         this.sideEffects.mergeInto(sideEffects);
     }
+
+    public void storeSideEffectsInMemory() {
+        if (this.worker)
+            this.sideEffects.forEach(this.memory::add);
+        else
+            this.sideEffects.forEach(this.memory::set);
+    }
+
+    public static void setMemorySideEffects(final Traversal.Admin<?, ?> traversal, final Memory memory, final boolean worker) {
+        final TraversalSideEffects sideEffects = traversal.getSideEffects();
+        if (!(sideEffects instanceof MemoryTraversalSideEffects)) {
+            traversal.setSideEffects(new MemoryTraversalSideEffects(sideEffects));
+        }
+        ((MemoryTraversalSideEffects) traversal.getSideEffects()).memory = memory;
+        ((MemoryTraversalSideEffects) traversal.getSideEffects()).worker = worker;
+    }
+
+    public static Set<MemoryComputeKey> getMemoryComputeKeys(final Traversal.Admin<?, ?> traversal) {
+        final Set<MemoryComputeKey> keys = new HashSet<>();
+        final TraversalSideEffects sideEffects =
+                traversal.getSideEffects() instanceof MemoryTraversalSideEffects ?
+                        ((MemoryTraversalSideEffects) traversal.getSideEffects()).getSideEffects() :
+                        traversal.getSideEffects();
+        sideEffects.keys().
+                stream().
+                filter(key -> !key.equals(TraversalVertexProgram.HALTED_TRAVERSERS)).
+                forEach(key -> keys.add(MemoryComputeKey.of(key, sideEffects.getReducer(key), true, false)));
+        return keys;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index f6ed25d..21d7f06 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -34,10 +34,8 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.Traversa
 import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
-import org.apache.tinkerpop.gremlin.process.traversal.Path;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -45,21 +43,10 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.MemoryComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.IdStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.LabelStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyKeyStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyValueStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.SackStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileSideEffectStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
@@ -72,10 +59,8 @@ import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.util.function.MutableMetricsSupplier;
 import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -147,10 +132,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                         (this.traversal.get().getParent().asStep().getNextStep() instanceof ProfileStep && // same as above, but needed for profiling
                                 this.traversal.get().getParent().asStep().getNextStep().getNextStep() instanceof ComputerResultStep));
         // register traversal side-effects in memory
-        final TraversalSideEffects sideEffects = ((MemoryTraversalSideEffects) this.traversal.get().getSideEffects()).getSideEffects();
-        sideEffects.keys().forEach(key -> this.memoryComputeKeys.add(MemoryComputeKey.of(key, sideEffects.getReducer(key), true, false)));
+        this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
         // register MapReducer memory compute keys
-        this.memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, Operator.and, false, true));
         for (final MapReducer<?, ?, ?, ?, ?> mapReducer : TraversalHelper.getStepsOfAssignableClassRecursively(MapReducer.class, this.traversal.get())) {
             this.mapReducers.add(mapReducer.getMapReduce());
             this.memoryComputeKeys.add(MemoryComputeKey.of(mapReducer.getMapReduce().getMemoryKey(), Operator.assign, false, false));
@@ -164,7 +147,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             this.traversal.get().getSideEffects().register(profileStep.getId(), new MutableMetricsSupplier(profileStep.getPreviousStep()), ProfileStep.ProfileBiOperator.instance());
         }
         // register TraversalVertexProgram specific memory compute keys
-        this.memoryComputeKeys.add(MemoryComputeKey.of(HALTED_TRAVERSERS, Operator.addAll, false, !this.returnHaltedTraversers)); // only keep if it will be preserved
+        this.memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, Operator.and, false, true));
+        this.memoryComputeKeys.add(MemoryComputeKey.of(HALTED_TRAVERSERS, Operator.addAll, false, false));
         this.memoryComputeKeys.add(MemoryComputeKey.of(ACTIVE_TRAVERSERS, Operator.addAll, true, true));
         this.memoryComputeKeys.add(MemoryComputeKey.of(MUTATED_MEMORY_KEYS, Operator.addAll, false, true));
         this.memoryComputeKeys.add(MemoryComputeKey.of(COMPLETED_BARRIERS, Operator.addAll, true, true));
@@ -179,14 +163,28 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void setup(final Memory memory) {
         // memory is local
-        ((MemoryTraversalSideEffects) this.traversal.get().getSideEffects()).setMemory(memory, false);
+        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, false);
+        final MemoryTraversalSideEffects sideEffects = ((MemoryTraversalSideEffects) this.traversal.get().getSideEffects());
+        sideEffects.storeSideEffectsInMemory();
         memory.set(VOTE_TO_HALT, true);
-        memory.set(HALTED_TRAVERSERS, new TraverserSet<>());
-        memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>());
         memory.set(MUTATED_MEMORY_KEYS, new HashSet<>());
         memory.set(COMPLETED_BARRIERS, new HashSet<>());
-        final TraversalSideEffects sideEffects = ((MemoryTraversalSideEffects) this.traversal.get().getSideEffects()).getSideEffects();
-        sideEffects.keys().forEach(key -> memory.set(key, sideEffects.get(key)));
+        // if halted traversers are being sent from a previous VertexProgram in an OLAP chain (non-distributed traversers), get them into the stream
+        if (sideEffects.exists(HALTED_TRAVERSERS)) {
+            final TraverserSet<Object> haltedTraversers = sideEffects.get(HALTED_TRAVERSERS);
+            final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
+            IteratorUtils.removeOnNext(haltedTraversers.iterator()).forEachRemaining(traverser -> {
+                traverser.setStepId(this.traversal.get().getStartStep().getId());
+                toProcessTraversers.add(traverser);
+            });
+            assert haltedTraversers.isEmpty(); // TODO: this should be empty
+            final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>();
+            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, haltedTraversers);
+            memory.set(ACTIVE_TRAVERSERS, remoteActiveTraversers);
+        } else {
+            memory.set(HALTED_TRAVERSERS, new TraverserSet<>());
+            memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>());
+        }
     }
 
     @Override
@@ -197,7 +195,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void execute(final Vertex vertex, final Messenger<TraverserSet<Object>> messenger, final Memory memory) {
         // memory is distributed
-        ((MemoryTraversalSideEffects) this.traversal.get().getSideEffects()).setMemory(memory, true);
+        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, true);
         // if a barrier was completed in another worker, it is also completed here (ensure distributed barries are synchronized)
         final Set<String> completedBarriers = memory.get(COMPLETED_BARRIERS);
         for (final String stepId : completedBarriers) {
@@ -247,27 +245,20 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public boolean terminate(final Memory memory) {
         // memory is local
-        ((MemoryTraversalSideEffects) this.traversal.get().getSideEffects()).setMemory(memory, false);
+        MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, false);
         final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
         memory.set(VOTE_TO_HALT, true);
         memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>());
         if (voteToHalt) {
-            final Set<String> mutatedMemoryKeys = memory.get(MUTATED_MEMORY_KEYS);
-            memory.set(MUTATED_MEMORY_KEYS, new HashSet<>());
             // local traverser sets to process
-            TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
-            TraverserSet<Object> localActiveTraversers = new TraverserSet<>();
+            final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
             final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>();
             final TraverserSet<Object> haltedTraversers = memory.get(HALTED_TRAVERSERS);
             // get all barrier traversers
             final Set<String> completedBarriers = new HashSet<>();
-            this.processMemory(memory, mutatedMemoryKeys, toProcessTraversers, completedBarriers);
+            MasterExecutor.processMemory(this.traversalMatrix, memory, toProcessTraversers, completedBarriers);
             // process all results from barriers locally and when elements are touched, put them in remoteActiveTraversers
-            while (!toProcessTraversers.isEmpty()) {
-                this.processTraversers(toProcessTraversers, localActiveTraversers, remoteActiveTraversers, haltedTraversers);
-                toProcessTraversers = localActiveTraversers;
-                localActiveTraversers = new TraverserSet<>();
-            }
+            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, haltedTraversers);
             // tell parallel barriers that might not have been active in the last round that they are no longer active
             memory.set(COMPLETED_BARRIERS, completedBarriers);
             if (!remoteActiveTraversers.isEmpty() ||
@@ -295,90 +286,6 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         }
     }
 
-    private void processMemory(final Memory memory, final Set<String> toProcessMemoryKeys, final TraverserSet<Object> traverserSet, final Set<String> completedBarriers) {
-        for (final String key : toProcessMemoryKeys) {
-            final Step<Object, Object> step = this.traversalMatrix.getStepById(key);
-            if (null == step) continue;
-            assert step instanceof Barrier;
-            completedBarriers.add(step.getId());
-            if (!(step instanceof LocalBarrier)) {  // local barriers don't do any processing on the master traversal (they just lock on the workers)
-                final Barrier<Object> barrier = (Barrier<Object>) step;
-                barrier.addBarrier(memory.get(key));
-                while (step.hasNext()) {
-                    traverserSet.add(step.next());
-                }
-                if (step instanceof ReducingBarrierStep)
-                    memory.set(step.getId(), ((ReducingBarrierStep) step).getSeedSupplier().get());
-            }
-        }
-    }
-
-    private void processTraversers(final TraverserSet<Object> toProcessTraversers, final TraverserSet<Object> localActiveTraversers, final TraverserSet<Object> remoteActiveTraversers, final TraverserSet<Object> haltedTraversers) {
-        Step<Object, Object> previousStep = EmptyStep.instance();
-        Step<Object, Object> currentStep = EmptyStep.instance();
-
-        final Iterator<Traverser.Admin<Object>> traversers = IteratorUtils.removeOnNext(toProcessTraversers.iterator());
-        while (traversers.hasNext()) {
-            final Traverser.Admin<Object> traverser = traversers.next();
-            traverser.set(DetachedFactory.detach(traverser.get(), true));
-            traverser.setSideEffects(this.traversal.get().getSideEffects());
-            if (traverser.isHalted()) {
-                traverser.detach();
-                haltedTraversers.add(traverser);
-            } else if (this.isRemoteTraverser(traverser)) {  // this is so that patterns like order().name work as expected.
-                traverser.detach();
-                remoteActiveTraversers.add(traverser);
-            } else {
-                currentStep = this.traversalMatrix.getStepById(traverser.getStepId());
-                if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep)) {
-                    previousStep.forEachRemaining(result -> {
-                        if (result.isHalted()) {
-                            result.detach();
-                            haltedTraversers.add(result);
-                        } else {
-                            if (this.isRemoteTraverser(result)) {
-                                result.detach();
-                                remoteActiveTraversers.add(result);
-                            } else
-                                localActiveTraversers.add(result);
-                        }
-                    });
-                }
-                currentStep.addStart(traverser);
-                previousStep = currentStep;
-            }
-        }
-        if (!(currentStep instanceof EmptyStep)) {
-            currentStep.forEachRemaining(traverser -> {
-                if (traverser.isHalted()) {
-                    traverser.detach();
-                    haltedTraversers.add(traverser);
-                } else {
-                    if (this.isRemoteTraverser(traverser)) {
-                        traverser.detach();
-                        remoteActiveTraversers.add(traverser);
-                    } else
-                        localActiveTraversers.add(traverser);
-                }
-            });
-        }
-        assert toProcessTraversers.isEmpty();
-    }
-
-    private boolean isRemoteTraverser(final Traverser.Admin traverser) {
-        return traverser.get() instanceof Attachable &&
-                !(traverser.get() instanceof Path) &&
-                !isLocalElement(this.traversalMatrix.getStepById(traverser.getStepId()));
-    }
-
-    // TODO: once this is complete (fully known), move to TraversalHelper
-    private static boolean isLocalElement(final Step<?, ?> step) {
-        return step instanceof PropertiesStep || step instanceof PropertyMapStep ||
-                step instanceof IdStep || step instanceof LabelStep || step instanceof SackStep ||
-                step instanceof PropertyKeyStep || step instanceof PropertyValueStep ||
-                step instanceof TailGlobalStep || step instanceof RangeGlobalStep || step instanceof HasStep;
-    }
-
     @Override
     public Set<VertexComputeKey> getVertexComputeKeys() {
         return VERTEX_COMPUTE_KEYS;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
index dfa7983..764df2b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ProgramVertexProgramStep.java
@@ -21,7 +21,9 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
 
 import org.apache.commons.configuration.MapConfiguration;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
@@ -47,7 +49,11 @@ public final class ProgramVertexProgramStep extends VertexProgramStep {
 
     @Override
     public VertexProgram generateProgram(final Graph graph) {
-        return VertexProgram.createVertexProgram(graph, new MapConfiguration(this.configuration));
+        final MapConfiguration base = new MapConfiguration(this.configuration);
+        base.setDelimiterParsingDisabled(true);
+        PureTraversal.storeState(base, TraversalVertexProgram.TRAVERSAL, this.getTraversal().clone());
+        base.setProperty(STEP_ID, this.getId());
+        return VertexProgram.createVertexProgram(graph, base);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index 8fe7ed0..9b9d5d7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.VertexComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
@@ -31,8 +32,10 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
 
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutionException;
@@ -44,6 +47,7 @@ import java.util.concurrent.Future;
  */
 public abstract class VertexProgramStep extends AbstractStep<ComputerResult, ComputerResult> implements VertexComputing {
 
+    public static final String STEP_ID = "gremlin.vertexProgramStep.stepId";
     protected Computer computer = Computer.compute();
 
     protected boolean first = true;
@@ -96,7 +100,7 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
     protected boolean previousTraversalVertexProgram() {
         Step<?, ?> currentStep = this;
         while (!(currentStep instanceof EmptyStep)) {
-            if(Thread.interrupted()) throw new TraversalInterruptedException();
+            if (Thread.interrupted()) throw new TraversalInterruptedException();
             if (currentStep instanceof TraversalVertexProgramStep)
                 return true;
             currentStep = currentStep.getPreviousStep();
@@ -106,13 +110,22 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
 
     private void processMemorySideEffects(final Memory memory) {
         // unfortunately there is no easy way to test this in a test case
-        assert this.isEndStep() == memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS);
+        //  assert this.isEndStep() == memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS);
         final TraversalSideEffects sideEffects = this.getTraversal().getSideEffects();
         for (final String key : memory.keys()) {
             if (sideEffects.exists(key)) {
                 sideEffects.set(key, memory.get(key));
             }
         }
+        if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS) && !this.isEndStep()) {
+            final TraverserSet<Object> haltedTraversers = memory.get(TraversalVertexProgram.HALTED_TRAVERSERS);
+            if (!haltedTraversers.isEmpty()) {
+                if (sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS))
+                    sideEffects.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
+                else
+                    sideEffects.register(TraversalVertexProgram.HALTED_TRAVERSERS, new ConstantSupplier<>(haltedTraversers), Operator.addAll);
+            }
+        }
     }
 
     protected boolean isEndStep() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
index c12e624..85a2f83 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
@@ -22,6 +22,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorat
 import org.apache.tinkerpop.gremlin.process.computer.Computer;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.VertexComputing;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ProgramVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
@@ -64,7 +65,7 @@ public final class VertexProgramStrategy extends AbstractTraversalStrategy<Trave
         Step<?, ?> currentStep = traversal.getEndStep();
         final Set<String> currentLabels = new HashSet<>();
         while (!(currentStep instanceof EmptyStep)) {
-            if (currentStep instanceof VertexComputing) {
+            if (currentStep instanceof VertexComputing && !(currentStep instanceof ProgramVertexProgramStep)) {  // todo: is there a general solution?
                 currentLabels.addAll(currentStep.getLabels());
                 currentStep.getLabels().forEach(currentStep::removeLabel);
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalSideEffects.java
index 0816461..8a8c5e2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalSideEffects.java
@@ -185,6 +185,7 @@ public class DefaultTraversalSideEffects implements TraversalSideEffects {
         this.objectMap.remove(key);
         this.supplierMap.remove(key);
         this.reducerMap.remove(key);
+        this.keys.remove(key);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyProgramTest.groovy
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyProgramTest.groovy b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyProgramTest.groovy
index 6630ffc..7abe113 100644
--- a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyProgramTest.groovy
+++ b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyProgramTest.groovy
@@ -39,6 +39,11 @@ public abstract class GroovyProgramTest {
         public Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasLabelXpersonX_programXpageRank_rankX_order_byXrank_incrX_valueMapXname_rankX() {
             new ScriptTraversal<>(g, "gremlin-groovy", "g.V.hasLabel('person').program(PageRankVertexProgram.build().property('rank').create(graph)).order.by('rank',incr).valueMap('name','rank')");
         }
+
+        @Override
+        public Traversal<Vertex, Map<String, Object>> get_g_V_outXcreatedX_aggregateXxX_byXlangX_groupCount_programXTestProgramX_asXaX_selectXa_xX() {
+            new ScriptTraversal<>(g, "gremlin-groovy", "g.V.out('created').aggregate('x').by('lang').groupCount.program(new ${ProgramTest.TestProgram.class.getCanonicalName()}()).as('a').select('a', 'x')");
+        }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
index 827286e..dcef79c 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
@@ -19,16 +19,40 @@
 
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
+import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ProgramVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.EmptyTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 import static org.junit.Assert.assertEquals;
@@ -44,6 +68,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
 
     public abstract Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasLabelXpersonX_programXpageRank_rankX_order_byXrank_incrX_valueMapXname_rankX();
 
+    public abstract Traversal<Vertex, Map<String, Object>> get_g_V_outXcreatedX_aggregateXxX_byXlangX_groupCount_programXTestProgramX_asXaX_selectXa_xX();
 
     @Test
     @LoadGraphWith(MODERN)
@@ -81,6 +106,30 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
         assertEquals(4, counter);
     }
 
+    @Test
+    @LoadGraphWith(MODERN)
+    public void g_V_outXcreatedX_aggregateXxX_byXlangX_groupCount_programXTestProgramX_asXaX_selectXa_xX() {
+        final Traversal<Vertex, Map<String, Object>> traversal = get_g_V_outXcreatedX_aggregateXxX_byXlangX_groupCount_programXTestProgramX_asXaX_selectXa_xX();
+        final List<Map<String, Object>> results = traversal.toList();
+        assertFalse(traversal.hasNext());
+        assertEquals(4, results.size());
+        final BulkSet<String> bulkSet = new BulkSet<>();
+        bulkSet.add("java", 4);
+        for (int i = 0; i < 4; i++) {
+            assertEquals(bulkSet, results.get(i).get("x"));
+        }
+        final Set<String> strings = new HashSet<>();
+        strings.add((String) results.get(0).get("a"));
+        strings.add((String) results.get(1).get("a"));
+        strings.add((String) results.get(2).get("a"));
+        strings.add((String) results.get(3).get("a"));
+        assertEquals(4, strings.size());
+        assertTrue(strings.contains("hello"));
+        assertTrue(strings.contains("gremlin"));
+        assertTrue(strings.contains("lop"));
+        assertTrue(strings.contains("ripple"));
+    }
+
 
     public static class Traversals extends ProgramTest {
 
@@ -93,5 +142,136 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
         public Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasLabelXpersonX_programXpageRank_rankX_order_byXrank_incrX_valueMapXname_rankX() {
             return g.V().hasLabel("person").program(PageRankVertexProgram.build().property("rank").create(graph)).order().by("rank", Order.incr).valueMap("name", "rank");
         }
+
+        @Override
+        public Traversal<Vertex, Map<String, Object>> get_g_V_outXcreatedX_aggregateXxX_byXlangX_groupCount_programXTestProgramX_asXaX_selectXa_xX() {
+            return g.V().out("created").aggregate("x").by("lang").groupCount().program(new TestProgram()).as("a").select("a", "x");
+        }
+    }
+
+    /////////////////////
+
+    public static class TestProgram extends StaticVertexProgram {
+
+        private PureTraversal<?, ?> traversal = new PureTraversal<>(EmptyTraversal.instance());
+        private Step programStep = EmptyStep.instance();
+
+        private final Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
+
+        @Override
+        public void loadState(final Graph graph, final Configuration configuration) {
+            super.loadState(graph, configuration);
+            this.traversal = PureTraversal.loadState(configuration, TraversalVertexProgram.TRAVERSAL, graph);
+            this.programStep = new TraversalMatrix<>(this.traversal.get()).getStepById(configuration.getString(ProgramVertexProgramStep.STEP_ID));
+            this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
+            this.memoryComputeKeys.add(MemoryComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, Operator.addAll, false, false));
+            this.memoryComputeKeys.add(MemoryComputeKey.of(TraversalVertexProgram.ACTIVE_TRAVERSERS, Operator.addAll, true, true));
+        }
+
+        @Override
+        public void storeState(final Configuration configuration) {
+            super.storeState(configuration);
+            this.traversal.storeState(configuration, TraversalVertexProgram.TRAVERSAL);
+            configuration.setProperty(ProgramVertexProgramStep.STEP_ID, this.programStep.getId());
+        }
+
+        @Override
+        public void setup(final Memory memory) {
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, false);
+            final TraversalSideEffects sideEffects = this.traversal.get().getSideEffects();
+            final TraverserSet<Object> haltedTraversers = sideEffects.get(TraversalVertexProgram.HALTED_TRAVERSERS);
+            sideEffects.remove(TraversalVertexProgram.HALTED_TRAVERSERS);
+            this.checkSideEffects();
+            final Map<Vertex, Long> map = (Map<Vertex, Long>) haltedTraversers.iterator().next().get();
+            assertEquals(2, map.size());
+            assertTrue(map.values().contains(3l));
+            assertTrue(map.values().contains(1l));
+            final TraverserSet<Object> activeTraversers = new TraverserSet<>();
+            map.keySet().forEach(vertex -> activeTraversers.add(haltedTraversers.peek().split(vertex, EmptyStep.instance())));
+            memory.set(TraversalVertexProgram.ACTIVE_TRAVERSERS, activeTraversers);
+
+        }
+
+        @Override
+        public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, true);
+            this.checkSideEffects();
+            final TraverserSet<Vertex> activeTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
+            if (vertex.label().equals("software")) {
+                assertEquals(1, activeTraversers.stream().filter(v -> v.get().equals(vertex)).count());
+                if (memory.isInitialIteration()) {
+                    assertFalse(vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent());
+                    vertex.property(
+                            TraversalVertexProgram.HALTED_TRAVERSERS,
+                            new TraverserSet<>(this.traversal.get().getTraverserGenerator().generate(vertex.value("name"), this.programStep, 1l)));
+                } else {
+                    assertTrue(vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent());
+                }
+            } else {
+                assertFalse(vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent());
+                assertEquals(0, activeTraversers.stream().filter(v -> v.get().equals(vertex)).count());
+            }
+        }
+
+        @Override
+        public boolean terminate(final Memory memory) {
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, false);
+            checkSideEffects();
+            final TraverserSet<String> haltedTraversers = new TraverserSet<>();
+            haltedTraversers.add(this.traversal.get().getTraverserGenerator().generate("hello", this.programStep, 1l));
+            haltedTraversers.add(this.traversal.get().getTraverserGenerator().generate("gremlin", this.programStep, 1l));
+            memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
+            return !memory.isInitialIteration();
+        }
+
+        @Override
+        public void workerIterationStart(final Memory memory) {
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, true);
+            checkSideEffects();
+        }
+
+        @Override
+        public void workerIterationEnd(final Memory memory) {
+            MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, true);
+            checkSideEffects();
+        }
+
+        @Override
+        public Set<VertexComputeKey> getVertexComputeKeys() {
+            return Collections.singleton(VertexComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, false));
+        }
+
+        @Override
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return this.memoryComputeKeys;
+        }
+
+        @Override
+        public Set<MessageScope> getMessageScopes(Memory memory) {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public GraphComputer.ResultGraph getPreferredResultGraph() {
+            return GraphComputer.ResultGraph.NEW;
+        }
+
+        @Override
+        public GraphComputer.Persist getPreferredPersist() {
+            return GraphComputer.Persist.EDGES;
+        }
+
+        ////////
+
+        private void checkSideEffects() {
+            final TraversalSideEffects sideEffects = this.traversal.get().getSideEffects();
+            assertTrue(sideEffects instanceof MemoryTraversalSideEffects);
+//            assertEquals(1, sideEffects.keys().size());
+            assertTrue(sideEffects.exists("x"));
+//            assertFalse(sideEffects.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
+            final BulkSet<String> bulkSet = sideEffects.get("x");
+            assertEquals(4, bulkSet.size());
+            assertEquals(4, bulkSet.get("java"));
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/39d709d3/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 070f3b2..4980832 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -39,7 +39,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.SumGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
@@ -68,24 +67,25 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
     public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
         vertexProgram.setup(memory);
         final Traversal.Admin<Vertex, Object> traversal = (Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
-        final Object[] graphStepIds = ((GraphStep) traversal.getStartStep()).getIds();    // any V(1,2,3)-style ids to filter on
+        final GraphStep<Vertex, Vertex> graphStep = ((GraphStep) traversal.getStartStep());
+        final Object[] graphStepIds = graphStep.getIds();    // any V(1,2,3)-style ids to filter on
         final ReducingBarrierStep endStep = (ReducingBarrierStep) traversal.getEndStep(); // needed for the final traverser generation
         traversal.removeStep(0);                                    // remove GraphStep
         traversal.removeStep(traversal.getSteps().size() - 1);      // remove ReducingBarrierStep
         traversal.applyStrategies();                                // compile
         boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
         ///////////////////////////////
-        ((MemoryTraversalSideEffects) traversal.getSideEffects()).setMemory(memory, true); // any intermediate sideEffect steps are backed by SparkMemory
+        MemoryTraversalSideEffects.setMemorySideEffects(traversal, memory, true); // any intermediate sideEffect steps are backed by SparkMemory
         memory.setInExecute(true);
         final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
                 .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)
                 .flatMap(vertexWritable -> {
                     if (identityTraversal)                          // g.V.count()-style (identity)
-                        return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), EmptyStep.instance(), 1l));
+                        return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l));
                     else {                                          // add the vertex to head of the traversal
                         return () -> {                              // and iterate it for its results
                             final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
-                            clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), EmptyStep.instance(), 1l));
+                            clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), graphStep, 1l));
                             return (Step) clone.getEndStep();
                         };
                     }


Mime
View raw message