tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [48/50] [abbrv] tinkerpop git commit: refactored GraphActors packaging -- its not actor/, but actors/. JavaDoc and various cleanups. Also, about to NOT serialize a traversal but instead use Bytecode. Next push will do this with TraveraslVertexProgram.
Date Tue, 10 Jan 2017 16:11:56 GMT
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java
new file mode 100644
index 0000000..9b8fe38
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java
@@ -0,0 +1,107 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.Processor;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration.ActorProgramStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.concurrent.Future;
+
+/**
+ * GraphActors is a message-passing based graph {@link Processor} that is:
+ * asynchronous, distributed, and partition-centric.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface GraphActors<R> extends Processor {
+
+    public static final String GRAPH_ACTORS = "gremlin.graphActors";
+    public static final String GRAPH_ACTORS_WORKERS = "gremlin.graphActors.workers";
+
+    /**
+     * Provide the {@link ActorProgram} that the GraphActors will execute.
+     *
+     * @param program the program to execute
+     * @return the updated GraphActors with newly defined program
+     */
+    public GraphActors<R> program(final ActorProgram program);
+
+    /**
+     * Specify the number of workers per {@link Graph} {@link org.apache.tinkerpop.gremlin.structure.Partition}.
+     *
+     * @param workers the number of workers per partition
+     * @return the updated GraphActors with newly defined workers
+     */
+    public GraphActors<R> workers(final int workers);
+
+    /**
+     * Add an arbitrary configuration to the GraphActors system.
+     * Typically, these configurations are provider-specific and do not generalize across all GraphActor implementations.
+     *
+     * @param key   the key of the configuration
+     * @param value the value of the configuration
+     * @return the updated GraphActors with newly defined configuration
+     */
+    public GraphActors<R> configure(final String key, final Object value);
+
+    /**
+     * Execute the {@link ActorProgram} on the {@link GraphActors} system against the specified {@link Graph}.
+     *
+     * @return a {@link Future} denoting a reference to the asynchronous computation's result
+     */
+    @Override
+    public Future<R> submit(final Graph graph);
+
+    /**
+     * Returns an {@link ActorProgramStrategy} which enables a {@link Traversal} to execute on {@link GraphActors}.
+     *
+     * @return a {@link org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy} capable of executing traversals on a GraphActors system
+     */
+    @Override
+    public default ProcessorTraversalStrategy<GraphActors> getProcessorTraversalStrategy() {
+        return new ActorProgramStrategy(this);
+    }
+
+    /**
+     * Create an arbitrary GraphActors system given the information contained in the provided {@link Configuration}.
+     *
+     * @param configuration the {@link Configuration} containing, at minimum, {@link GraphActors#GRAPH_ACTORS} system class name
+     * @param <A>           the particular type of GraphActors
+     * @return a constructed GraphActors system
+     */
+    public static <A extends GraphActors> A open(final Configuration configuration) {
+        try {
+            return (A) Class.forName(configuration.getString(GRAPH_ACTORS)).getMethod("open", Configuration.class).invoke(null, configuration);
+        } catch (final Exception e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+    }
+
+    public static <A extends GraphActors> A open(final Class<A> graphActorsClass) {
+        final BaseConfiguration configuration = new BaseConfiguration();
+        configuration.setProperty(GRAPH_ACTORS, graphActorsClass.getCanonicalName());
+        return GraphActors.open(configuration);
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
new file mode 100644
index 0000000..484b904
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -0,0 +1,129 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration.ActorProgramStrategy;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verification.ActorVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.LazyBarrierStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.MatchPredicateStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.PathRetractionStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.RepeatUnrollStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalActorProgram<R> implements ActorProgram {
+
+    public static final String TRAVERSAL_ACTOR_PROGRAM_BYTECODE = "gremlin.traversalActorProgram.bytecode";
+
+    private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList(
+            StartMessage.class,
+            Traverser.class,
+            SideEffectAddMessage.class,
+            BarrierAddMessage.class,
+            SideEffectSetMessage.class,
+            BarrierDoneMessage.class,
+            Terminate.class);
+
+    private Traversal.Admin<?, R> traversal;
+    public TraverserSet<R> result = new TraverserSet<>();
+
+    public TraversalActorProgram(final Traversal.Admin<?, R> traversal) {
+        this.traversal = traversal;
+        final TraversalStrategies strategies = this.traversal.getStrategies().clone();
+        strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance());
+        // TODO: make TinkerGraph/etc. strategies smart about actors
+        new ArrayList<>(strategies.toList()).stream().
+                filter(s -> s instanceof TraversalStrategy.ProviderOptimizationStrategy).
+                map(TraversalStrategy::getClass).
+                forEach(strategies::removeStrategies);
+        strategies.removeStrategies(
+                ActorProgramStrategy.class,
+                LazyBarrierStrategy.class,
+                RepeatUnrollStrategy.class,
+                MatchPredicateStrategy.class,
+                InlineFilterStrategy.class,
+                PathRetractionStrategy.class);
+        this.traversal.setStrategies(strategies);
+        this.traversal.applyStrategies();
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        configuration.setProperty(ACTOR_PROGRAM, TraversalActorProgram.class.getCanonicalName());
+        configuration.setProperty(TRAVERSAL_ACTOR_PROGRAM_BYTECODE, this.traversal.getBytecode());
+    }
+
+    @Override
+    public void loadState(final Graph graph, final Configuration configuration) {
+        final Bytecode bytecode = (Bytecode) configuration.getProperty(TRAVERSAL_ACTOR_PROGRAM_BYTECODE);
+        this.traversal = (Traversal.Admin<?, R>) JavaTranslator.of(graph.traversal()).translate(bytecode);
+    }
+
+    @Override
+    public TraversalActorProgram.Worker createWorkerProgram(final Actor.Worker worker) {
+        return new TraversalWorkerProgram(worker, this.traversal.clone());
+    }
+
+    @Override
+    public TraversalActorProgram.Master createMasterProgram(final Actor.Master master) {
+        return new TraversalMasterProgram(master, this.traversal.clone(), this.result);
+    }
+
+    @Override
+    public Optional<List<Class>> getMessagePriorities() {
+        return Optional.of(MESSAGE_PRIORITIES);
+    }
+
+    @Override
+    public TraversalActorProgram<R> clone() {
+        try {
+            final TraversalActorProgram<R> clone = (TraversalActorProgram<R>) super.clone();
+            clone.traversal = this.traversal.clone();
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
new file mode 100644
index 0000000..e447cdb
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -0,0 +1,179 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class TraversalMasterProgram implements ActorProgram.Master<Object> {
+
+    private final Actor.Master master;
+    private final Traversal.Admin<?, ?> traversal;
+    private final TraversalMatrix<?, ?> matrix;
+    private Map<String, Barrier> barriers = new HashMap<>();
+    private final TraverserSet<?> results;
+    private Address.Worker leaderWorker;
+    private int orderCounter = -1;
+    private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
+
+    public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) {
+        this.traversal = traversal;
+        // System.out.println("master[created]: " + master.address().getId());
+        // System.out.println(this.traversal);
+        this.matrix = new TraversalMatrix<>(this.traversal);
+        this.results = results;
+        this.master = master;
+        Distributing.configure(this.traversal, true, true);
+        Pushing.configure(this.traversal, true, false);
+    }
+
+    @Override
+    public void setup() {
+        this.leaderWorker = this.master.workers().get(0);
+        for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) {
+            this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i));
+        }
+        this.broadcast(StartMessage.instance());
+        this.master.send(this.leaderWorker, Terminate.MAYBE);
+    }
+
+    @Override
+    public void execute(final Object message) {
+        if (message instanceof Traverser.Admin) {
+            this.processTraverser((Traverser.Admin) message);
+        } else if (message instanceof BarrierAddMessage) {
+            final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId());
+            final Step<?, ?> step = (Step) barrier;
+            barrier.addBarrier(((BarrierAddMessage) message).getBarrier());
+            this.barriers.put(step.getId(), barrier);
+        } else if (message instanceof SideEffectAddMessage) {
+            this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue());
+        } else if (message instanceof Terminate) {
+            assert Terminate.YES == message;
+            if (!this.barriers.isEmpty()) {
+                for (final Barrier barrier : this.barriers.values()) {
+                    final Step<?, ?> step = (Step) barrier;
+                    if (!(barrier instanceof LocalBarrier)) {
+                        this.orderBarrier(step);
+                        if (step instanceof OrderGlobalStep) this.orderCounter = 0;
+                        while (step.hasNext()) {
+                            this.sendTraverser(-1 == this.orderCounter ?
+                                    step.next() :
+                                    new OrderedTraverser<>(step.next(), this.orderCounter++));
+                        }
+                    } else {
+                        if (step instanceof SideEffectCapable) {
+                            final String key = ((SideEffectCapable) step).getSideEffectKey();
+                            this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
+                        }
+                        this.broadcast(new BarrierDoneMessage(barrier));
+                        barrier.done();
+                    }
+                }
+                this.barriers.clear();
+                this.master.send(this.leaderWorker, Terminate.MAYBE);
+            } else {
+                while (this.traversal.hasNext()) {
+                    this.results.add((Traverser.Admin) this.traversal.nextTraverser());
+                }
+                if (this.orderCounter != -1)
+                    this.results.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
+
+                this.master.close();
+            }
+        } else {
+            throw new IllegalStateException("Unknown message:" + message);
+        }
+    }
+
+    @Override
+    public void terminate() {
+        this.master.result().setResult(this.results);
+    }
+
+    private void broadcast(final Object message) {
+        for (final Address.Worker worker : this.master.workers()) {
+            this.master.send(worker, message);
+        }
+    }
+
+    private void processTraverser(final Traverser.Admin traverser) {
+        if (traverser.isHalted() || traverser.get() instanceof Element) {
+            this.sendTraverser(traverser);
+        } else {
+            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+            step.addStart(traverser);
+            if (step instanceof Barrier) {
+                this.barriers.put(step.getId(), (Barrier) step);
+            } else {
+                while (step.hasNext()) {
+                    this.processTraverser(step.next());
+                }
+            }
+        }
+    }
+
+    private void sendTraverser(final Traverser.Admin traverser) {
+        if (traverser.isHalted())
+            this.results.add(traverser);
+        else if (traverser.get() instanceof Element)
+            this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser);
+        else
+            this.master.send(this.master.address(), traverser);
+    }
+
+    private void orderBarrier(final Step step) {
+        if (this.orderCounter != -1 && step instanceof Barrier && (step instanceof RangeGlobalStep || step instanceof TailGlobalStep)) {
+            final Barrier barrier = (Barrier) step;
+            final TraverserSet<?> rangingBarrier = (TraverserSet<?>) barrier.nextBarrier();
+            rangingBarrier.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
+            barrier.addBarrier(rangingBarrier);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
new file mode 100644
index 0000000..2aaa7b5
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -0,0 +1,170 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
+
+    private final Actor.Worker self;
+    private final TraversalMatrix<?, ?> matrix;
+    private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
+    //
+    private Address.Worker neighborWorker;
+    private boolean isLeader;
+    private Terminate terminate = null;
+    private boolean voteToHalt = false;
+    private Map<String, Barrier> barriers = new HashMap<>();
+
+    public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
+        this.self = self;
+        // System.out.println("worker[created]: " + this.self.address().getId());
+        // set up partition and traversal information
+        final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
+        TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
+        this.matrix = new TraversalMatrix<>(traversal);
+        Distributing.configure(traversal, false, true);
+        Pushing.configure(traversal, true, false);
+        //////
+        final GraphStep graphStep = (GraphStep) traversal.getStartStep();
+        if (0 == graphStep.getIds().length)
+            ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.self.partition()::vertices : this.self.partition()::edges);
+        else {
+            if (graphStep.returnsVertex())
+                ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier(
+                        () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains));
+            else
+                ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
+                        () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains));
+        }
+    }
+
+    @Override
+    public void setup() {
+        // create termination ring topology
+        final int i = this.self.workers().indexOf(this.self.address());
+        this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1);
+        this.isLeader = i == 0;
+        for (int j = 0; j < this.self.partitioner().getPartitions().size(); j++) {
+            this.partitionToWorkerMap.put(this.self.partitioner().getPartitions().get(j), this.self.workers().get(j));
+        }
+    }
+
+    @Override
+    public void execute(final Object message) {
+        //System.out.println(message + "::" + this.isLeader);
+        if (message instanceof StartMessage) {
+            // initial message from master that says: "start processing"
+            final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
+            while (step.hasNext()) {
+                this.sendTraverser(step.next());
+            }
+        } else if (message instanceof Traverser.Admin) {
+            this.processTraverser((Traverser.Admin) message);
+        } else if (message instanceof SideEffectSetMessage) {
+            this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue());
+        } else if (message instanceof BarrierDoneMessage) {
+            final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId());
+            while (step.hasNext()) {
+                sendTraverser(step.next());
+            }
+        } else if (message instanceof Terminate) {
+            assert null == this.terminate;
+            this.terminate = (Terminate) message;
+            if (!this.barriers.isEmpty()) {
+                for (final Barrier barrier : this.barriers.values()) {
+                    while (barrier.hasNextBarrier()) {
+                        this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+                    }
+                }
+                this.barriers.clear();
+            }
+            // use termination token to determine termination condition
+            if (this.isLeader) {
+                if (this.voteToHalt && Terminate.YES == this.terminate)
+                    this.self.send(this.self.master(), Terminate.YES);
+                else
+                    this.self.send(this.neighborWorker, Terminate.YES);
+            } else
+                this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate : Terminate.NO);
+            this.terminate = null;
+            this.voteToHalt = true;
+        } else {
+            throw new IllegalArgumentException("The following message is unknown: " + message);
+        }
+    }
+
+    @Override
+    public void terminate() {
+
+    }
+
+    //////////////
+
+    private void processTraverser(final Traverser.Admin traverser) {
+        assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get());
+        final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+        step.addStart(traverser);
+        if (step instanceof Barrier) {
+            this.barriers.put(step.getId(), (Barrier) step);
+        } else {
+            while (step.hasNext()) {
+                this.sendTraverser(step.next());
+            }
+        }
+    }
+
+    private void sendTraverser(final Traverser.Admin traverser) {
+        this.voteToHalt = false;
+        if (traverser.isHalted())
+            this.self.send(this.self.master(), traverser);
+        else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
+            this.self.send(this.partitionToWorkerMap.get(this.self.partitioner().getPartition((Element) traverser.get())), traverser);
+        else
+            this.self.send(this.self.address(), traverser);
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
new file mode 100644
index 0000000..b660eda
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
@@ -0,0 +1,148 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerTraversalSideEffects implements TraversalSideEffects {
+
+    private TraversalSideEffects sideEffects;
+    private Actor.Worker worker;
+
+
+    private WorkerTraversalSideEffects() {
+        // for serialization
+    }
+
+    public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) {
+        this.sideEffects = sideEffects;
+        this.worker = worker;
+    }
+
+    public TraversalSideEffects getSideEffects() {
+        return this.sideEffects;
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        this.sideEffects.set(key, value);
+    }
+
+    @Override
+    public <V> V get(final String key) throws IllegalArgumentException {
+        return this.sideEffects.get(key);
+    }
+
+    @Override
+    public void remove(final String key) {
+        this.sideEffects.remove(key);
+    }
+
+    @Override
+    public Set<String> keys() {
+        return this.sideEffects.keys();
+    }
+
+    @Override
+    public void add(final String key, final Object value) {
+        this.sideEffects.add(key, value);
+        this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value));
+    }
+
+    @Override
+    public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+        this.sideEffects.register(key, initialValue, reducer);
+    }
+
+    @Override
+    public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+        this.sideEffects.registerIfAbsent(key, initialValue, reducer);
+    }
+
+    @Override
+    public <V> BinaryOperator<V> getReducer(final String key) {
+        return this.sideEffects.getReducer(key);
+    }
+
+    @Override
+    public <V> Supplier<V> getSupplier(final String key) {
+        return this.sideEffects.getSupplier(key);
+    }
+
+    @Override
+    @Deprecated
+    public void registerSupplier(final String key, final Supplier supplier) {
+        this.sideEffects.registerSupplier(key, supplier);
+    }
+
+    @Override
+    @Deprecated
+    public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+        return this.sideEffects.getRegisteredSupplier(key);
+    }
+
+    @Override
+    public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+        this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
+    }
+
+    @Override
+    public <S> Supplier<S> getSackInitialValue() {
+        return this.sideEffects.getSackInitialValue();
+    }
+
+    @Override
+    public <S> UnaryOperator<S> getSackSplitter() {
+        return this.sideEffects.getSackSplitter();
+    }
+
+    @Override
+    public <S> BinaryOperator<S> getSackMerger() {
+        return this.sideEffects.getSackMerger();
+    }
+
+    @Override
+    public TraversalSideEffects clone() {
+        try {
+            final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone();
+            clone.sideEffects = this.sideEffects.clone();
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void mergeInto(final TraversalSideEffects sideEffects) {
+        this.sideEffects.mergeInto(sideEffects);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
new file mode 100644
index 0000000..ac4c61d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierAddMessage {
+
+    private final Object barrier;
+    private final String stepId;
+
+    public BarrierAddMessage(final Barrier barrier) {
+        this.barrier = barrier.nextBarrier();
+        this.stepId = ((Step) barrier).getId();
+    }
+
+    public Object getBarrier() {
+        return this.barrier;
+    }
+
+    public String getStepId() {
+        return this.stepId;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java
new file mode 100644
index 0000000..7979c33
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierDoneMessage {
+
+    private final String stepId;
+
+    public BarrierDoneMessage(final Barrier barrier) {
+        this.stepId = ((Step) barrier).getId();
+
+    }
+
+    public String getStepId() {
+        return this.stepId;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
new file mode 100644
index 0000000..1c0a9de
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectAddMessage {
+
+    private final String key;
+    private final Object value;
+
+    public SideEffectAddMessage(final String key, final Object value) {
+        this.value = value;
+        this.key = key;
+    }
+
+    public String getKey() {
+        return this.key;
+    }
+
+    public Object getValue() {
+        return this.value;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java
new file mode 100644
index 0000000..84788f9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectSetMessage {
+
+    private final String key;
+    private final Object value;
+
+    public SideEffectSetMessage(final String key, final Object value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public String getKey() {
+        return this.key;
+    }
+
+    public Object getValue() {
+        return this.value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java
new file mode 100644
index 0000000..e704033
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StartMessage {
+
+    private static final StartMessage INSTANCE = new StartMessage();
+
+    private StartMessage() {
+    }
+
+    public static StartMessage instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
new file mode 100644
index 0000000..5621528
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
@@ -0,0 +1,28 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum Terminate {
+
+    MAYBE, YES, NO
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
new file mode 100644
index 0000000..d6b8858
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
@@ -0,0 +1,73 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.step.map;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.TraversalActorProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> {
+
+
+    private final Traversal.Admin<S, E> actorsTraversal;
+    private final Configuration graphActorsConfiguration;
+    private boolean first = true;
+
+    public TraversalActorProgramStep(final Traversal.Admin<?, ?> traversal, final Configuration graphActorsConfiguration) {
+        super(traversal);
+        this.graphActorsConfiguration = graphActorsConfiguration;
+        this.actorsTraversal = (Traversal.Admin) traversal.clone();
+        this.actorsTraversal.setParent(EmptyStep.instance());
+    }
+
+    @Override
+    protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
+        if (this.first) {
+            this.first = false;
+            try {
+                final GraphActors<TraverserSet<E>> graphActors = GraphActors.open(this.graphActorsConfiguration);
+                final ActorProgram actorProgram = new TraversalActorProgram<>(this.actorsTraversal);
+                graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get().forEach(this.starts::add);
+            } catch (final Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+        return this.starts.next();
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.stepString(this, this.actorsTraversal);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java
new file mode 100644
index 0000000..6e4365e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java
@@ -0,0 +1,94 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.step.map.TraversalActorProgramStep;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorProgramStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy>
+        implements TraversalStrategy.DecorationStrategy, ProcessorTraversalStrategy<GraphActors> {
+
+    private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class);
+
+    private final Configuration graphActorsConfiguration;
+
+    public ActorProgramStrategy(final GraphActors graphActors) {
+        this.graphActorsConfiguration = graphActors.configuration();
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        ReadOnlyStrategy.instance().apply(traversal);
+
+        if (!(traversal.getParent() instanceof EmptyStep))
+            return;
+
+        final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.graphActorsConfiguration);
+        TraversalHelper.removeAllSteps(traversal);
+        traversal.addStep(actorStep);
+
+        // validations
+        assert traversal.getStartStep().equals(actorStep);
+        assert traversal.getSteps().size() == 1;
+        assert traversal.getEndStep() == actorStep;
+    }
+
+    @Override
+    public Set<Class<? extends DecorationStrategy>> applyPrior() {
+        return PRIORS;
+    }
+
+    ////////////////////////////////////////////////////////////
+
+    @Override
+    public Configuration getConfiguration() {
+        return this.graphActorsConfiguration;
+    }
+
+    public static ActorProgramStrategy create(final Configuration configuration) {
+        try {
+            return new ActorProgramStrategy(GraphActors.open(configuration));
+        } catch (final Exception e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public GraphActors getProcessor() {
+        return GraphActors.open(this.graphActorsConfiguration);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
new file mode 100644
index 0000000..cdf5465
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verification;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorVerificationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy {
+
+    private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy();
+
+    private ActorVerificationStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty())
+            throw new VerificationException("Inject traversal currently not supported", traversal);
+    }
+
+    public static ActorVerificationStrategy instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java
new file mode 100644
index 0000000..208a9a1
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.util;
+
+import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class DefaultActorsResult<R> implements ActorsResult<R> {
+
+    private R result;
+
+    public DefaultActorsResult() {
+
+    }
+
+    public R getResult() {
+        return this.result;
+    }
+
+    public void setResult(final R result) {
+        this.result = result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java
new file mode 100644
index 0000000..2af1ecd
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GraphActorsHelper {
+
+    private GraphActorsHelper() {
+
+    }
+
+    public static GraphActors configure(GraphActors actors, final Configuration configuration) {
+        final Iterator<String> keys = IteratorUtils.asList(configuration.getKeys()).iterator();
+        while (keys.hasNext()) {
+            final String key = keys.next();
+            if (key.equals(GraphActors.GRAPH_ACTORS_WORKERS))
+                actors = actors.workers(configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS));
+            else if (!key.equals(GraphActors.GRAPH_ACTORS))
+                actors = actors.configure(key, configuration.getProperty(key));
+        }
+        return actors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
index 96dae61..42bd864 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal;
 
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
index f7c350d..ee7a196 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
@@ -19,7 +19,7 @@
 package org.apache.tinkerpop.gremlin.structure.util;
 
 import org.apache.tinkerpop.gremlin.process.Processor;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java
index 43b3608..482577f 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java
@@ -19,7 +19,7 @@
 package org.apache.tinkerpop.gremlin;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java
index 9d63b3c..d0d877d 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java
@@ -19,7 +19,7 @@
 package org.apache.tinkerpop.gremlin;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/edc51a04/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
index ec3ece2..cc9d995 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.actors;
 
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.junit.Test;
 


Mime
View raw message