tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [17/50] [abbrv] tinkerpop git commit: okay. mailboxes are controlled by ActorProgram.getMessagePriorities(). Took me forever to figure out how to dynamically configure akka mailboxes.
Date Wed, 11 Jan 2017 17:53:25 GMT
okay. mailboxes are controlled by ActorProgram.getMessagePriorities(). Took me forever to figure
out how to dynamically configure akka mailboxes.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2a04b625
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2a04b625
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2a04b625

Branch: refs/heads/TINKERPOP-1564
Commit: 2a04b6256b965227edd470ae4a7bdfb4ecba1a57
Parents: 47bee43
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Tue Dec 13 12:58:54 2016 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed Jan 11 10:52:04 2017 -0700

----------------------------------------------------------------------
 .../gremlin/akka/process/actor/AkkaActors.java  |   9 +-
 .../gremlin/akka/process/actor/MasterActor.java |   1 +
 .../akka/process/actor/TraverserMailbox.java    | 104 ++++++++++---------
 .../gremlin/akka/process/actor/WorkerActor.java |   2 +-
 .../src/main/resources/application.conf         |   4 +
 .../tinkerpop/gremlin/process/actor/Actor.java  |   4 +
 .../gremlin/process/actor/ActorProgram.java     |   6 ++
 .../actor/traversal/TraversalActorProgram.java  |  27 +++++
 .../actor/traversal/TraversalMasterProgram.java |   6 +-
 .../actor/traversal/TraversalWorkerProgram.java |   8 +-
 .../traversal/WorkerTraversalSideEffects.java   |   1 +
 .../actor/traversal/message/Terminate.java      |  28 +++++
 12 files changed, 141 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
index de301c1..5faa8d6 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -21,6 +21,9 @@ package org.apache.tinkerpop.gremlin.akka.process.actor;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actor.Actors;
 import org.apache.tinkerpop.gremlin.process.actor.Address;
@@ -28,6 +31,7 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -40,7 +44,10 @@ public final class AkkaActors<R> implements Actors<R> {
 
     public AkkaActors(final ActorProgram<R> actorProgram, final Partitioner partitioner)
{
         this.actorProgram = actorProgram;
-        this.system = ActorSystem.create("traversal-" + actorProgram.hashCode());
+        final Config config = ConfigFactory.defaultApplication().
+                withValue("message-priorities",
+                        ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
+        this.system = ActorSystem.create("traversal-" + actorProgram.hashCode(), config);
         this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class,
this.actorProgram, partitioner), "master").path().toString());
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
index d7b45fa..8d61492 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -81,4 +81,5 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
     public void close() {
         context().system().terminate();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
index 671f3a2..8251ab4 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
@@ -26,13 +26,15 @@ import akka.dispatch.MailboxType;
 import akka.dispatch.MessageQueue;
 import akka.dispatch.ProducesMessageQueue;
 import com.typesafe.config.Config;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalWorkerProgram;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import scala.Option;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 
 /**
@@ -40,78 +42,77 @@ import java.util.Queue;
  */
 public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue>
{
 
+    private final List<Class> messagePriorities = new ArrayList<>();
+
+
     public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics
{
-        private final Queue<Envelope> otherMessages = new LinkedList<>();
-        private final TraverserSet<?> traverserMessages = new TraverserSet<>();
-        private Envelope haltMessage = null;
-        private Envelope terminateToken = null;
-        private final ActorRef owner;
+        private final List<Queue> messages;
+        private final Map<Class, Queue> priorities;
         private final Object MUTEX = new Object();
 
-        public TraverserMessageQueue(final ActorRef owner) {
-            this.owner = owner;
+        public TraverserMessageQueue(final List<Class> messagePriorities) {
+            this.messages = new ArrayList<>(messagePriorities.size());
+            this.priorities = new HashMap<>(messagePriorities.size());
+            for (final Class clazz : messagePriorities) {
+                final Queue queue;
+                if (Traverser.class.isAssignableFrom(clazz))
+                    queue = new TraverserSet<>();
+                else
+                    queue = new LinkedList<>();
+                this.messages.add(queue);
+                this.priorities.put(clazz, queue);
+            }
         }
 
         public void enqueue(final ActorRef receiver, final Envelope handle) {
             synchronized (MUTEX) {
-                if (handle.message() instanceof Traverser.Admin)
-                    this.traverserMessages.offer((Traverser.Admin) handle.message());
-                else if (handle.message() instanceof VoteToHaltMessage) {
-                    assert null == this.haltMessage;
-                    this.haltMessage = handle;
-                } else if (handle.message() instanceof TraversalWorkerProgram.Terminate)
{
-                    assert null == this.terminateToken;
-                    this.terminateToken = handle;
-                } else
-                    this.otherMessages.offer(handle);
+                final Queue queue = this.priorities.get(Traverser.class.isAssignableFrom(handle.message().getClass())
? Traverser.class : handle.message().getClass());
+                if (null == queue)
+                    throw new IllegalArgumentException("The provided message type is not
registered: " + handle.message().getClass());
+                else
+                    queue.offer(handle.message() instanceof Traverser ? handle.message()
: handle);
             }
         }
 
         public Envelope dequeue() {
             synchronized (MUTEX) {
-                if (!this.otherMessages.isEmpty())
-                    return this.otherMessages.poll();
-                if (!this.traverserMessages.isEmpty())
-                    return new Envelope(this.traverserMessages.poll(), this.owner);
-                else if (null != this.terminateToken) {
-                    final Envelope temp = this.terminateToken;
-                    this.terminateToken = null;
-                    return temp;
-                } else {
-                    final Envelope temp = this.haltMessage;
-                    this.haltMessage = null;
-                    return temp;
+                for (final Queue queue : this.messages) {
+                    if (!queue.isEmpty()) {
+                        final Object m = queue.poll();
+                        return m instanceof Traverser ? new Envelope(m, ActorRef.noSender())
: (Envelope) m;
+                    }
                 }
+                return null;
             }
         }
 
         public int numberOfMessages() {
             synchronized (MUTEX) {
-                return this.otherMessages.size() + this.traverserMessages.size() + (null
== this.haltMessage ? 0 : 1) + (null == this.terminateToken ? 0 : 1);
+                int counter = 0;
+                for (final Queue queue : this.messages) {
+                    counter = counter + queue.size();
+                }
+                return counter;
             }
         }
 
         public boolean hasMessages() {
             synchronized (MUTEX) {
-                return !this.otherMessages.isEmpty() || !this.traverserMessages.isEmpty()
|| null != this.haltMessage || this.terminateToken != null;
+                for (final Queue queue : this.messages) {
+                    if (!queue.isEmpty())
+                        return true;
+                }
+                return false;
             }
         }
 
         public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
             synchronized (MUTEX) {
-                for (final Envelope handle : this.otherMessages) {
-                    deadLetters.enqueue(owner, handle);
-                }
-                for (final Traverser.Admin<?> traverser : this.traverserMessages) {
-                    deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
-                }
-                if (null != this.haltMessage) {
-                    deadLetters.enqueue(owner, this.haltMessage);
-                    this.haltMessage = null;
-                }
-                if (null != this.terminateToken) {
-                    deadLetters.enqueue(owner, this.terminateToken);
-                    this.terminateToken = null;
+                for (final Queue queue : this.messages) {
+                    while (!queue.isEmpty()) {
+                        final Object m = queue.poll();
+                        deadLetters.enqueue(owner, m instanceof Traverser ? new Envelope(m,
ActorRef.noSender()) : (Envelope) m);
+                    }
                 }
             }
         }
@@ -119,12 +120,19 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
 
     // This constructor signature must exist, it will be called by Akka
     public TraverserMailbox(final ActorSystem.Settings settings, final Config config) {
-        // put your initialization code here
+        try {
+            final String[] messages = ((String) settings.config().getAnyRef("message-priorities")).replace("[",
"").replace("]", "").split(",");
+            for (final String clazz : messages) {
+                this.messagePriorities.add(Class.forName(clazz.trim()));
+            }
+        } catch (final ClassNotFoundException e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
     }
 
     // The create method is called to create the MessageQueue
     public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem>
system) {
-        return new TraverserMessageQueue(owner.isEmpty() ? ActorRef.noSender() : owner.get());
+        return new TraverserMessageQueue(this.messagePriorities);
     }
 
     public static interface TraverserSetSemantics {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
index 84dbe37..c023312 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -53,7 +53,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
         for (final Partition partition : partitioner.getPartitions()) {
             this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
         }
-        ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
+        final ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
         receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
         workerProgram.setup();
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index 706e3a6..a9bda12 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -6,6 +6,10 @@ akka.actor.mailbox.requirements {
   "org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox$TraverserSetSemantics"
= custom-dispatcher-mailbox
 }
 
+akka {
+  log-dead-letters-during-shutdown = "false"
+}
+
 custom-dispatcher-mailbox {
   mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
index 2552883..ed627de 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
@@ -32,6 +32,8 @@ public interface Actor {
 
     public <M> void send(final Address toActor, final M message);
 
+
+
     public interface Master extends Actor {
 
         public List<Address.Worker> workers();
@@ -40,6 +42,7 @@ public interface Actor {
 
         public void close();
 
+
     }
 
     public interface Worker extends Actor {
@@ -51,6 +54,7 @@ public interface Actor {
         public List<Address.Worker> workers();
 
         public Partition partition();
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
index b8f7ac1..3ae54d1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
@@ -19,6 +19,8 @@
 
 package org.apache.tinkerpop.gremlin.process.actor;
 
+import java.util.List;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -28,8 +30,11 @@ public interface ActorProgram<M> {
 
     public Master createMasterProgram(final Actor.Master master);
 
+    public List<Class> getMessagePriorities();
+
     public M getResult();
 
+
     public static interface Worker<M> {
         public void setup();
 
@@ -45,6 +50,7 @@ public interface ActorProgram<M> {
         public void execute(final M message);
 
         public void terminate();
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
index e72b989..88eb670 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -21,20 +21,41 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal;
 
 import org.apache.tinkerpop.gremlin.process.actor.Actor;
 import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet<R>>
{
 
+    private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList(
+            StartMessage.class,
+            Traverser.class,
+            SideEffectAddMessage.class,
+            BarrierAddMessage.class,
+            SideEffectSetMessage.class,
+            BarrierDoneMessage.class,
+            Terminate.class,
+            VoteToHaltMessage.class);
+
     private final Traversal.Admin<?, R> traversal;
     private final Partitioner partitioner;
     public TraverserSet<R> result = new TraverserSet<>();
@@ -60,6 +81,12 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet
     }
 
     @Override
+    public List<Class> getMessagePriorities() {
+        return MESSAGE_PRIORITIES;
+    }
+
+
+    @Override
     public TraverserSet<R> getResult() {
         return this.result;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index ba051e2..45fb6b9 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -39,13 +39,15 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
+final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
 
     private final Actor.Master master;
     private final Traversal.Admin<?, ?> traversal;
@@ -144,6 +146,4 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M>
{
         else
             this.master.send(this.master.address(), traverser);
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index 4275caa..898e191 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMe
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -49,12 +50,8 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
+final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
 
-    // terminate token is passed around worker ring to gather termination consensus (dual-ring
termination algorithm)
-    public enum Terminate {
-        MAYBE, YES, NO
-    }
 
     private final Actor.Worker self;
     private final TraversalMatrix<?, ?> matrix;
@@ -186,5 +183,4 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
         else
             this.self.send(this.self.address(), traverser);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
index 6ab66c4..0950435 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
@@ -73,6 +73,7 @@ public final class WorkerTraversalSideEffects implements TraversalSideEffects
{
 
     @Override
     public void add(final String key, final Object value) {
+        this.sideEffects.add(key, value);
         this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value));
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2a04b625/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
new file mode 100644
index 0000000..4ab789d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
@@ -0,0 +1,28 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum Terminate {
+
+    MAYBE, YES, NO
+}


Mime
View raw message