tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject tinkerpop git commit: solved my actors race conditition from Friday. I had it such that the master actor was not part of the termination token ring which was bad. Given that the master actor does send traversers, it should be able to VOTE_TO_HALT. Now it
Date Mon, 23 Jan 2017 16:09:21 GMT
Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 1c4f6f547 -> 15a122c30


solved my actors race conditition from Friday. I had it such that the master actor was not
part of the termination token ring which was bad. Given that the master actor does send traversers,
it should be able to VOTE_TO_HALT. Now it is part of the termination ring and things work
as expected. I was able to expose 2 more tests from the ProcessTestSuite. The remaining 3
tests that are failing are most likely related and have to do with nested attachment/detachment.
Also, Terminate is now a simple two state enum -- YES or NO. No more MAYBE needed as master
is part of the ring.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/15a122c3
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/15a122c3
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/15a122c3

Branch: refs/heads/TINKERPOP-1564
Commit: 15a122c30d4ca5ec87f1e36bab83913fe8c6b1bc
Parents: 1c4f6f5
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Mon Jan 23 09:09:14 2017 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Mon Jan 23 09:09:14 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actors/AkkaActorsProvider.java |  2 --
 .../actors/traversal/TraversalActorProgram.java |  2 +-
 .../traversal/TraversalMasterProgram.java       | 34 ++++++++++++++------
 .../traversal/TraversalWorkerProgram.java       | 27 +++++-----------
 .../actors/traversal/message/Terminate.java     |  2 +-
 5 files changed, 35 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15a122c3/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
index 34b4542..d3a2d6e 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
@@ -53,9 +53,7 @@ public class AkkaActorsProvider extends AbstractTinkerGraphProvider {
             "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack",
             "g_V_repeatXdedupX_timesX2X_count",
             "g_withSackXmap__map_cloneX_V_out_out_sackXmap_a_nameX_sack",
-            "g_V_out_group_byXlabelX_selectXpersonX_unfold_outXcreatedX_name_limitX2X",
             "g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX",
-            "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX",
             "classicRecommendation",
             "coworkerSummaryOLTP",
             GraphTest.Traversals.class.getCanonicalName(),

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15a122c3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
index c772ebc..b85bcb0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -70,11 +70,11 @@ public final class TraversalActorProgram<R> implements ActorProgram
{
 
     private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList(
             StartMessage.class,
+            BarrierDoneMessage.class,
             Traverser.class,
             SideEffectAddMessage.class,
             BarrierAddMessage.class,
             SideEffectSetMessage.class,
-            BarrierDoneMessage.class,
             Terminate.class);
 
     private Traversal.Admin<?, R> traversal;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15a122c3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index 5947361..48b5138 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -39,6 +39,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
@@ -61,9 +62,10 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object>
{
     private Map<String, Barrier> barriers = new HashMap<>();
     private Set<String> sideEffects = new HashSet<>();
     private final TraverserSet<?> results;
-    private Address.Worker leaderWorker;
+    private Address.Worker neighborAddress;
     private int orderCounter = -1;
     private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
+    private boolean voteToHalt = true;
 
     public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?,
?> traversal, final TraverserSet<?> results) {
         this.traversal = traversal;
@@ -78,12 +80,13 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object>
{
 
     @Override
     public void setup() {
-        this.leaderWorker = this.master.workers().get(0);
+        this.neighborAddress = this.master.workers().get(0);
         for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) {
             this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i),
this.master.workers().get(i));
         }
         this.broadcast(StartMessage.instance());
-        this.master.send(this.leaderWorker, Terminate.MAYBE);
+        this.voteToHalt = false;
+        this.master.send(this.neighborAddress, Terminate.NO);
     }
 
     @Override
@@ -96,18 +99,25 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object>
{
                 barrier.addBarrier(TraversalActorProgram.attach(((BarrierAddMessage) message).getBarrier(),
this.master.partitioner().getGraph()));
             if (barrier instanceof SideEffectCapable)
                 this.sideEffects.add(((SideEffectCapable) barrier).getSideEffectKey());
+            if (barrier instanceof SideEffectCapStep)
+                this.sideEffects.addAll(((SideEffectCapStep) barrier).getSideEffectKeys());
             this.barriers.put(((Step) barrier).getId(), barrier);
         } else if (message instanceof SideEffectAddMessage) {
             final SideEffectAddMessage sideEffectAddMessage = (SideEffectAddMessage) message;
             this.traversal.getSideEffects().add(sideEffectAddMessage.getKey(), TraversalActorProgram.attach(sideEffectAddMessage.getValue(),
this.master.partitioner().getGraph()));
             this.sideEffects.add(sideEffectAddMessage.getKey());
         } else if (message instanceof Terminate) {
-            assert Terminate.YES == message;
-            if (!this.barriers.isEmpty() || !this.sideEffects.isEmpty()) {
+            if (message == Terminate.NO)
+                this.voteToHalt = false;
+            if (this.voteToHalt && !this.sideEffects.isEmpty()) {
                 // process all side-effect updates
                 for (final String key : this.sideEffects) {
                     this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
                 }
+                this.sideEffects.clear();
+                this.voteToHalt = false;
+                this.master.send(this.neighborAddress, Terminate.NO);
+            } else if (this.voteToHalt && !this.barriers.isEmpty()) {
                 // process all barriers
                 for (final Barrier barrier : this.barriers.values()) {
                     this.broadcast(new BarrierDoneMessage(barrier));
@@ -124,9 +134,12 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object>
{
                         }
                     }
                 }
-                this.sideEffects.clear();
                 this.barriers.clear();
-                this.master.send(this.leaderWorker, Terminate.MAYBE);
+                this.voteToHalt = false;
+                this.master.send(this.neighborAddress, Terminate.NO);
+            } else if (!this.voteToHalt) {
+                this.voteToHalt = true;
+                this.master.send(this.neighborAddress, Terminate.YES);
             } else {
                 while (this.traversal.hasNext()) {
                     final Traverser.Admin traverser = this.traversal.nextTraverser();
@@ -172,9 +185,12 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object>
{
     }
 
     private void sendTraverser(final Traverser.Admin traverser) {
-        if (traverser.isHalted())
+        if (traverser.isHalted()) {
             this.results.add(traverser);
-        else if (traverser.get() instanceof Element)
+            return;
+        }
+        this.voteToHalt = false;
+        if (traverser.get() instanceof Element)
             this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().find((Element)
traverser.get())), this.detachTraverser(traverser));
         else
             this.master.send(this.master.address(), this.detachTraverser(traverser));

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15a122c3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index 5ea7d07..9f0a3df 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -55,10 +55,8 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object>
{
     private final TraversalMatrix<?, ?> matrix;
     private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
     //
-    private Address.Worker neighborWorker;
-    private boolean isLeader;
-    private Terminate terminate = null;
-    private boolean voteToHalt = false;
+    private Address neighborAddress;
+    private boolean voteToHalt = true;
     private Map<String, Barrier> barriers = new HashMap<>();
 
     public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?>
traversal) {
@@ -88,8 +86,7 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object>
{
     public void setup() {
         // create termination ring topology
         final int i = this.self.workers().indexOf(this.self.address());
-        this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1
? 0 : i + 1);
-        this.isLeader = i == 0;
+        this.neighborAddress = i == this.self.workers().size() - 1 ? this.self.master() :
this.self.workers().get(i + 1);
         for (int j = 0; j < this.self.partition().partitioner().getPartitions().size();
j++) {
             this.partitionToWorkerMap.put(this.self.partition().partitioner().getPartitions().get(j),
this.self.workers().get(j));
         }
@@ -102,8 +99,7 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object>
{
             // initial message from master that says: "start processing"
             final GraphStep<?, ?> step = (GraphStep) this.matrix.getTraversal().getStartStep();
             while (step.hasNext()) {
-                final Traverser.Admin<? extends Element> traverser = step.next();
-                this.self.send(traverser.isHalted() ? this.self.master() : this.self.address(),
this.detachTraverser(traverser));
+                this.sendTraverser(step.next());
             }
         } else if (message instanceof Traverser.Admin) {
             this.processTraverser((Traverser.Admin) message);
@@ -119,9 +115,8 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object>
{
             } else
                 ((Barrier) step).done();       // the master drains the global barrier
         } else if (message instanceof Terminate) {
-            assert null == this.terminate;
-            this.terminate = (Terminate) message;
-            if (!this.barriers.isEmpty()) {
+            final Terminate terminate = (Terminate) message;
+            if (this.voteToHalt && !this.barriers.isEmpty()) {
                 for (final Barrier barrier : this.barriers.values()) {
                     if (barrier instanceof LocalBarrier) {
                         barrier.processAllStarts();
@@ -133,16 +128,10 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object>
{
                     }
                 }
                 this.barriers.clear();
+                this.voteToHalt = false;
             }
             // use termination token to determine termination condition
-            if (this.isLeader) {
-                if (this.voteToHalt && Terminate.YES == this.terminate)
-                    this.self.send(this.self.master(), Terminate.YES);
-                else
-                    this.self.send(this.neighborWorker, this.voteToHalt ? Terminate.YES :
Terminate.NO);
-            } else
-                this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate : Terminate.NO);
-            this.terminate = null;
+            this.self.send(this.neighborAddress, this.voteToHalt ? terminate : Terminate.NO);
             this.voteToHalt = true;
         } else {
             throw new IllegalArgumentException("The following message is unknown: " + message);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15a122c3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
index 5621528..a4bde19 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
@@ -24,5 +24,5 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
  */
 public enum Terminate {
 
-    MAYBE, YES, NO
+    YES, NO
 }


Mime
View raw message