tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [24/50] [abbrv] tinkerpop git commit: started on the process.actor interfaces in gremlin-core. Really cool as the naming/model lines up nicely with process.computer interfaces. ActorStep and ActorStrategy are now generalized to work with Actors interface
Date Wed, 11 Jan 2017 17:53:32 GMT
started on the process.actor interfaces in gremlin-core. Really cool as the naming/model lines up nicely with process.computer interfaces. ActorStep and ActorStrategy are now generalized to work with Actors interface and now live in gremlin-core. Pretty easy.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/78c22e11
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/78c22e11
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/78c22e11

Branch: refs/heads/TINKERPOP-1564
Commit: 78c22e1124f1c89b60e2d3964dfe17c49386b956
Parents: eb877b7
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Tue Dec 13 05:52:06 2016 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed Jan 11 10:52:04 2017 -0700

----------------------------------------------------------------------
 .../gremlin/akka/process/actor/AkkaActors.java  |   6 +-
 .../process/actor/MasterTraversalActor.java     | 104 ++++++++++---------
 .../process/actor/WorkerTraversalActor.java     |  67 +++++++-----
 .../actor/message/SideEffectAddMessage.java     |  18 ++--
 .../actor/message/SideEffectSetMessage.java     |  42 ++++++++
 .../process/traversal/step/map/ActorStep.java   |  76 --------------
 .../strategy/decoration/ActorStrategy.java      |  83 ---------------
 .../verification/ActorVerificationStrategy.java |  60 -----------
 .../akka/process/AkkaActorsProvider.java        |   5 +-
 .../gremlin/akka/process/AkkaPlayTest.java      |   5 +-
 .../tinkerpop/gremlin/process/actor/Actor.java  |  31 ++++++
 .../tinkerpop/gremlin/process/actor/Actors.java |  32 ++++++
 .../gremlin/process/actor/MasterActor.java      |  35 +++++++
 .../gremlin/process/actor/WorkerActor.java      |  35 +++++++
 .../actor/traversal/step/map/ActorStep.java     |  80 ++++++++++++++
 .../strategy/decoration/ActorStrategy.java      |  86 +++++++++++++++
 .../verification/ActorVerificationStrategy.java |  60 +++++++++++
 17 files changed, 516 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
index aa2a048..8ef96bb 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.akka.process.actor;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.actor.Actors;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
@@ -31,7 +32,7 @@ import java.util.concurrent.Future;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class AkkaActors<S, E> {
+public final class AkkaActors<S, E> implements Actors<S, E> {
 
     public final ActorSystem system;
     private TraverserSet<E> results = new TraverserSet<>();
@@ -41,7 +42,8 @@ public final class AkkaActors<S, E> {
         this.system.actorOf(Props.create(MasterTraversalActor.class, traversal.clone(), partitioner, this.results), "master");
     }
 
-    public Future<TraverserSet<E>> getResults() {
+    @Override
+    public Future<TraverserSet<E>> submit() {
         return CompletableFuture.supplyAsync(() -> {
             while (!this.system.isTerminated()) {
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
index d1a29d8..5009554 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
@@ -28,9 +28,11 @@ import akka.japi.pf.ReceiveBuilder;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
-import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.verification.ActorVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.actor.MasterActor;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -54,7 +56,7 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
+public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, MasterActor {
 
     private final Traversal.Admin<?, ?> traversal;
     private final TraversalMatrix<?, ?> matrix;
@@ -81,47 +83,11 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
 
         receive(ReceiveBuilder.
-                match(Traverser.Admin.class, traverser -> {
-                    this.processTraverser(traverser);
-                }).
-                match(BarrierAddMessage.class, barrierMerge -> {
-                    // get the barrier updates from the workers to synchronize against the master barrier
-                    final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
-                    final Step<?, ?> step = (Step) barrier;
-                    GraphComputing.atMaster(step, true);
-                    barrier.addBarrier(barrierMerge.getBarrier());
-                    this.barriers.put(step.getId(), barrier);
-                }).
-                match(SideEffectAddMessage.class, sideEffect -> {
-                    // get the side-effect updates from the workers to generate the master side-effects
-                    this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
-                }).
-                match(VoteToHaltMessage.class, voteToHalt -> {
-                    assert !sender().equals(self());
-                    if (!this.barriers.isEmpty()) {
-                        for (final Barrier barrier : this.barriers.values()) {
-                            final Step<?, ?> step = (Step) barrier;
-                            if (!(barrier instanceof LocalBarrier)) {
-                                while (step.hasNext()) {
-                                    this.sendTraverser(step.next());
-                                }
-                            } else {
-                                this.traversal.getSideEffects().forEach((k, v) -> {
-                                    this.broadcast(new SideEffectAddMessage(k, v));
-                                });
-                                this.broadcast(new BarrierDoneMessage(barrier));
-                                barrier.done();
-                            }
-                        }
-                        this.barriers.clear();
-                        worker(this.leaderWorker).tell(StartMessage.instance(), self());
-                    } else {
-                        while (this.traversal.hasNext()) {
-                            this.results.add((Traverser.Admin) this.traversal.nextTraverser());
-                        }
-                        context().system().terminate();
-                    }
-                }).build());
+                match(Traverser.Admin.class, this::processTraverser).
+                match(BarrierAddMessage.class, barrierMerge -> this.processBarrierAdd((Barrier) this.matrix.getStepById(barrierMerge.getStepId()), barrierMerge.getBarrier())).
+                match(SideEffectAddMessage.class, sideEffect -> this.processSideEffectAdd(((SideEffectAddMessage) sideEffect).getKey(), ((SideEffectAddMessage) sideEffect).getValue())).
+                match(VoteToHaltMessage.class, voteToHalt -> this.processVoteToHalt()).
+                build());
     }
 
     private void initializeWorkers() {
@@ -137,13 +103,49 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         this.workers.clear();
     }
 
-    private void broadcast(final Object message) {
-        for (final Partition partition : this.partitioner.getPartitions()) {
-            worker("worker-" + partition.hashCode()).tell(message, self());
+    @Override
+    public void processBarrierAdd(final Barrier barrier, final Object barrierAddition) {
+        final Step<?, ?> step = (Step) barrier;
+        GraphComputing.atMaster(step, true);
+        barrier.addBarrier(barrierAddition);
+        this.barriers.put(step.getId(), barrier);
+    }
+
+    @Override
+    public void processSideEffectAdd(final String key, final Object value) {
+        this.traversal.getSideEffects().add(key, value);
+    }
+
+    @Override
+    public void processVoteToHalt() {
+        assert !sender().equals(self());
+        if (!this.barriers.isEmpty()) {
+            for (final Barrier barrier : this.barriers.values()) {
+                final Step<?, ?> step = (Step) barrier;
+                if (!(barrier instanceof LocalBarrier)) {
+                    while (step.hasNext()) {
+                        this.sendTraverser(step.next());
+                    }
+                } else {
+                    this.traversal.getSideEffects().forEach((k, v) -> {
+                        this.broadcast(new SideEffectSetMessage(k, v));
+                    });
+                    this.broadcast(new BarrierDoneMessage(barrier));
+                    barrier.done();
+                }
+            }
+            this.barriers.clear();
+            worker(this.leaderWorker).tell(StartMessage.instance(), self());
+        } else {
+            while (this.traversal.hasNext()) {
+                this.results.add((Traverser.Admin) this.traversal.nextTraverser());
+            }
+            context().system().terminate();
         }
     }
 
-    private void processTraverser(final Traverser.Admin traverser) {
+    @Override
+    public void processTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted() || traverser.get() instanceof Element) {
             this.sendTraverser(traverser);
         } else {
@@ -156,6 +158,14 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         }
     }
 
+    ////////////////
+
+    private void broadcast(final Object message) {
+        for (final Partition partition : this.partitioner.getPartitions()) {
+            worker("worker-" + partition.hashCode()).tell(message, self());
+        }
+    }
+
     private void sendTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted())
             this.results.add(traverser);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
index 63eb707..862b6b3 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
@@ -26,9 +26,10 @@ import akka.dispatch.RequiresMessageQueue;
 import akka.japi.pf.ReceiveBuilder;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage;
 import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.actor.WorkerActor;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -51,7 +52,7 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
+public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, WorkerActor {
 
     // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm)
     public enum Terminate {
@@ -94,36 +95,15 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
         this.isLeader = i == 0;
 
         receive(ReceiveBuilder.
-                match(StartMessage.class, start -> {
-                    // initial message from master that says: "start processing"
-                    final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
-                    while (step.hasNext()) {
-                        this.sendTraverser(step.next());
-                    }
-                    // internal vote to have in mailbox as final message to process
-                    assert null == this.terminate;
-                    if (this.isLeader) {
-                        this.terminate = Terminate.MAYBE;
-                        self().tell(VoteToHaltMessage.instance(), self());
-                    }
-                }).
-                match(Traverser.Admin.class, traverser -> {
-                    this.processTraverser(traverser);
-                }).
-                match(SideEffectAddMessage.class, sideEffect -> {
-                    this.matrix.getTraversal().getSideEffects().set(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
-                }).
+                match(StartMessage.class, start -> this.processStart()).
+                match(Traverser.Admin.class, this::processTraverser).
+                match(SideEffectSetMessage.class, sideEffect -> this.processSideEffectSet(sideEffect.getKey(), sideEffect.getValue())).
+                match(BarrierDoneMessage.class, barrierDone -> this.processBarrierDone(this.matrix.getStepById(barrierDone.getStepId()))).
                 match(Terminate.class, terminate -> {
                     assert this.isLeader || this.terminate != Terminate.MAYBE;
                     this.terminate = terminate;
                     self().tell(VoteToHaltMessage.instance(), self());
                 }).
-                match(BarrierDoneMessage.class, barrierDone -> {
-                    final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(barrierDone.getStepId());
-                    while (step.hasNext()) {
-                        sendTraverser(step.next());
-                    }
-                }).
                 match(VoteToHaltMessage.class, haltSync -> {
                     // if there is a barrier and thus, halting at barrier, then process barrier
                     if (!this.barriers.isEmpty()) {
@@ -151,7 +131,23 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
         );
     }
 
-    private void processTraverser(final Traverser.Admin traverser) {
+    @Override
+    public void processStart() {
+        // initial message from master that says: "start processing"
+        final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
+        while (step.hasNext()) {
+            this.sendTraverser(step.next());
+        }
+        // internal vote to have in mailbox as final message to process
+        assert null == this.terminate;
+        if (this.isLeader) {
+            this.terminate = Terminate.MAYBE;
+            self().tell(VoteToHaltMessage.instance(), self());
+        }
+    }
+
+    @Override
+    public void processTraverser(final Traverser.Admin traverser) {
         assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
         final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
         if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
@@ -166,6 +162,21 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
         }
     }
 
+    @Override
+    public void processBarrierDone(final Barrier barrier) {
+        final Step<?, ?> step = (Step) barrier;
+        while (step.hasNext()) {
+            sendTraverser(step.next());
+        }
+    }
+
+    @Override
+    public void processSideEffectSet(final String key, final Object value) {
+        this.matrix.getTraversal().getSideEffects().set(key, value);
+    }
+
+    //////////////////////
+
     private void sendTraverser(final Traverser.Admin traverser) {
         this.voteToHalt = false;
         if (traverser.isHalted())

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
index 4a54d97..2d97bfa 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
@@ -24,20 +24,20 @@ package org.apache.tinkerpop.gremlin.akka.process.actor.message;
  */
 public final class SideEffectAddMessage {
 
-    private final String sideEffectKey;
-    private final Object sideEffect;
+    private final String key;
+    private final Object value;
 
-    public SideEffectAddMessage(final String sideEffectKey, final Object sideEffect) {
-        this.sideEffect = sideEffect;
-        this.sideEffectKey = sideEffectKey;
+    public SideEffectAddMessage(final String key, final Object value) {
+        this.value = value;
+        this.key = key;
     }
 
-    public String getSideEffectKey() {
-        return this.sideEffectKey;
+    public String getKey() {
+        return this.key;
     }
 
-    public Object getSideEffectValue() {
-        return this.sideEffect;
+    public Object getValue() {
+        return this.value;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
new file mode 100644
index 0000000..023133b
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectSetMessage {
+
+    private final String key;
+    private final Object value;
+
+    public SideEffectSetMessage(final String key, final Object value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public String getKey() {
+        return this.key;
+    }
+
+    public Object getValue() {
+        return this.value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java
deleted file mode 100644
index db41493..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.traversal.step.map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-
-import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaActors;
-import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-
-import java.util.NoSuchElementException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ActorStep<S, E> extends AbstractStep<E, E> {
-
-    public final Traversal.Admin<S, E> partitionTraversal;
-    private final Partitioner partitioner;
-    private boolean first = true;
-
-    public ActorStep(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
-        super(traversal);
-        this.partitionTraversal = (Traversal.Admin) traversal.clone();
-        final TraversalStrategies strategies = this.partitionTraversal.getStrategies().clone();
-        strategies.removeStrategies(ActorStrategy.class);
-        strategies.addStrategies(VertexProgramStrategy.instance());
-        this.partitionTraversal.setStrategies(strategies);
-        this.partitioner = partitioner;
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.stepString(this, this.partitionTraversal);
-    }
-
-    @Override
-    protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
-        if (this.first) {
-            this.first = false;
-            final AkkaActors<S, E> actors = new AkkaActors<>(this.partitionTraversal, this.partitioner);
-            try {
-                actors.getResults().get().forEach(this.starts::add);
-            } catch (final Exception e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-        return this.starts.next();
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java
deleted file mode 100644
index adbc257..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration;
-
-import org.apache.tinkerpop.gremlin.akka.process.traversal.step.map.ActorStep;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
-import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ActorStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy>
-        implements TraversalStrategy.DecorationStrategy {
-
-
-    private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class);
-    private static final Set<Class<? extends DecorationStrategy>> POSTS = Collections.singleton(VertexProgramStrategy.class);
-
-    private final Partitioner partitioner;
-
-    public ActorStrategy(final Partitioner partitioner) {
-        this.partitioner = partitioner;
-    }
-
-    @Override
-    public void apply(final Traversal.Admin<?, ?> traversal) {
-        ReadOnlyStrategy.instance().apply(traversal);
-        if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty())
-            throw new VerificationException("Inject traversal currently not supported", traversal);
-
-        if (!(traversal.getParent() instanceof EmptyStep))
-            return;
-
-        final ActorStep<?, ?> actorStep = new ActorStep<>(traversal, this.partitioner);
-        TraversalHelper.removeAllSteps(traversal);
-        traversal.addStep(actorStep);
-
-        // validations
-        assert traversal.getStartStep().equals(actorStep);
-        assert traversal.getSteps().size() == 1;
-        assert traversal.getEndStep() == actorStep;
-    }
-
-    @Override
-    public Set<Class<? extends DecorationStrategy>> applyPost() {
-        return POSTS;
-    }
-
-    @Override
-    public Set<Class<? extends DecorationStrategy>> applyPrior() {
-        return PRIORS;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java
deleted file mode 100644
index a9e3f7b..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.verification;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ActorVerificationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy {
-
-    private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy();
-
-    private ActorVerificationStrategy() {
-    }
-
-    @Override
-    public void apply(final Traversal.Admin<?, ?> traversal) {
-        if (!TraversalHelper.onGraphComputer(traversal))
-            return;
-        final boolean globalChild = TraversalHelper.isGlobalChild(traversal);
-        for (final Step<?, ?> step : traversal.getSteps()) {
-            // only global children are graph computing
-            if (globalChild && step instanceof GraphComputing)
-                ((GraphComputing) step).onGraphComputer();
-
-            for (String label : step.getLabels()) {
-                if (Graph.Hidden.isHidden(label))
-                    step.removeLabel(label);
-            }
-        }
-    }
-
-    public static ActorVerificationStrategy instance() {
-        return INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
index 9158040..19e0628 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
@@ -22,7 +22,8 @@ package org.apache.tinkerpop.gremlin.akka.process;
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
-import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy;
+import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaActors;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest;
@@ -134,7 +135,7 @@ public class AkkaActorsProvider extends AbstractGraphProvider {
             //throw new VerificationException("This test current does not work with Gremlin-Python", EmptyTraversal.instance());
         else {
             final GraphTraversalSource g = graph.traversal();
-            return g.withStrategies(new ActorStrategy(new HashPartitioner(graph.partitioner(), 3)));
+            return g.withStrategies(new ActorStrategy(AkkaActors.class, new HashPartitioner(graph.partitioner(), 3)));
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
index b064331..bed7636 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
@@ -19,7 +19,8 @@
 
 package org.apache.tinkerpop.gremlin.akka.process;
 
-import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy;
+import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaActors;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -40,7 +41,7 @@ public class AkkaPlayTest {
     public void testPlay1() throws Exception {
         final Graph graph = TinkerGraph.open();
         graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
-        GraphTraversalSource g = graph.traversal().withStrategies(new ActorStrategy(new HashPartitioner(graph.partitioner(), 3)));
+        GraphTraversalSource g = graph.traversal().withStrategies(new ActorStrategy(AkkaActors.class, new HashPartitioner(graph.partitioner(), 3)));
         System.out.println(g.V(1, 2).union(outE().count(), inE().count(), (Traversal) outE().values("weight").sum()).toList());
         //3, 1.9, 1
         /*for (int i = 0; i < 10000; i++) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
new file mode 100644
index 0000000..aa2f429
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
@@ -0,0 +1,31 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Actor {
+
+    public <S> void processTraverser(final Traverser.Admin<S> traverser);
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
new file mode 100644
index 0000000..d9e257e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
@@ -0,0 +1,32 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor;
+
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+
+import java.util.concurrent.Future;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Actors<S, E> {
+
+    public Future<TraverserSet<E>> submit();
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
new file mode 100644
index 0000000..87efe51
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor;
+
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface MasterActor extends Actor {
+
+    public <V> void processBarrierAdd(final Barrier barrier, final V barrierAddition);
+
+    public <V> void processSideEffectAdd(final String key, final V value);
+
+    public void processVoteToHalt();
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
new file mode 100644
index 0000000..6d4ca64
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor;
+
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface WorkerActor extends Actor {
+
+    public void processStart();
+
+    public void processBarrierDone(final Barrier barrier);
+
+    public void processSideEffectSet(final String key, final Object value);
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
new file mode 100644
index 0000000..e86810e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
@@ -0,0 +1,80 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal.step.map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+
+import org.apache.tinkerpop.gremlin.process.actor.Actors;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorStep<S, E> extends AbstractStep<E, E> {
+
+    private final Class<? extends Actors> actorsClass;
+    private final Traversal.Admin<S, E> partitionTraversal;
+    private final Partitioner partitioner;
+
+    private boolean first = true;
+
+
+    public ActorStep(final Traversal.Admin<?, ?> traversal, final Class<? extends Actors> actorsClass, final Partitioner partitioner) {
+        super(traversal);
+        this.actorsClass = actorsClass;
+        this.partitionTraversal = (Traversal.Admin) traversal.clone();
+        final TraversalStrategies strategies = this.partitionTraversal.getStrategies().clone();
+        strategies.removeStrategies(ActorStrategy.class);
+        strategies.addStrategies(VertexProgramStrategy.instance());
+        this.partitionTraversal.setStrategies(strategies);
+        this.partitioner = partitioner;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.stepString(this, this.partitionTraversal);
+    }
+
+    @Override
+    protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
+        if (this.first) {
+            this.first = false;
+            try {
+                final Actors<S, E> actors = this.actorsClass.getConstructor(Traversal.Admin.class, Partitioner.class).newInstance(this.partitionTraversal, this.partitioner);
+                actors.submit().get().forEach(this.starts::add);
+            } catch (final Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+        return this.starts.next();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorStrategy.java
new file mode 100644
index 0000000..ca9f64c
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorStrategy.java
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actors;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.step.map.ActorStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy>
+        implements TraversalStrategy.DecorationStrategy {
+
+
+    private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class);
+    private static final Set<Class<? extends DecorationStrategy>> POSTS = Collections.singleton(VertexProgramStrategy.class);
+
+    private final Partitioner partitioner;
+    private final Class<? extends Actors> actors;
+
+    public ActorStrategy(final Class<? extends Actors> actors, final Partitioner partitioner) {
+        this.actors = actors;
+        this.partitioner = partitioner;
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        ReadOnlyStrategy.instance().apply(traversal);
+        if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty())
+            throw new VerificationException("Inject traversal currently not supported", traversal);
+
+        if (!(traversal.getParent() instanceof EmptyStep))
+            return;
+
+        final ActorStep<?, ?> actorStep = new ActorStep<>(traversal, this.actors, this.partitioner);
+        TraversalHelper.removeAllSteps(traversal);
+        traversal.addStep(actorStep);
+
+        // validations
+        assert traversal.getStartStep().equals(actorStep);
+        assert traversal.getSteps().size() == 1;
+        assert traversal.getEndStep() == actorStep;
+    }
+
+    @Override
+    public Set<Class<? extends DecorationStrategy>> applyPost() {
+        return POSTS;
+    }
+
+    @Override
+    public Set<Class<? extends DecorationStrategy>> applyPrior() {
+        return PRIORS;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/78c22e11/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
new file mode 100644
index 0000000..6e0b410
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
@@ -0,0 +1,60 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorVerificationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy {
+
+    private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy();
+
+    private ActorVerificationStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        if (!TraversalHelper.onGraphComputer(traversal))
+            return;
+        final boolean globalChild = TraversalHelper.isGlobalChild(traversal);
+        for (final Step<?, ?> step : traversal.getSteps()) {
+            // only global children are graph computing
+            if (globalChild && step instanceof GraphComputing)
+                ((GraphComputing) step).onGraphComputer();
+
+            for (String label : step.getLabels()) {
+                if (Graph.Hidden.isHidden(label))
+                    step.removeLabel(label);
+            }
+        }
+    }
+
+    public static ActorVerificationStrategy instance() {
+        return INSTANCE;
+    }
+}


Mime
View raw message