tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [14/50] [abbrv] tinkerpop git commit: okay, all direct references to akka URIs are removed from gremlin-core. I have one more thing to do with Message priorities. After that, clean, Javadoc, etc. Going to take a break for a bit first.
Date Wed, 11 Jan 2017 17:53:22 GMT
okay, all direct references to akka URIs are removed from gremlin-core. I have one more thing
to do with Message priorities. After that, clean, Javadoc, etc. Going to take a break for
a bit first.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/47bee430
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/47bee430
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/47bee430

Branch: refs/heads/TINKERPOP-1564
Commit: 47bee43072216f3b9eaefbcd9440a8b7c9908487
Parents: 113d6cf
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Tue Dec 13 10:18:12 2016 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed Jan 11 10:52:04 2017 -0700

----------------------------------------------------------------------
 .../gremlin/akka/process/actor/AkkaActors.java  | 13 ++-
 .../gremlin/akka/process/actor/MasterActor.java | 84 ++++++++++++++++++
 .../process/actor/MasterTraversalActor.java     | 84 ------------------
 .../gremlin/akka/process/actor/WorkerActor.java | 91 ++++++++++++++++++++
 .../process/actor/WorkerTraversalActor.java     | 91 --------------------
 .../tinkerpop/gremlin/process/actor/Actors.java |  6 +-
 .../gremlin/process/actor/Address.java          | 11 ++-
 .../actor/traversal/TraversalActorProgram.java  | 16 ++--
 .../actor/traversal/TraversalMasterProgram.java | 19 ++--
 .../actor/traversal/TraversalWorkerProgram.java | 32 +++----
 .../actor/traversal/step/map/ActorStep.java     |  3 +-
 11 files changed, 222 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
index db024f6..de301c1 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -24,7 +24,6 @@ import akka.actor.Props;
 import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actor.Actors;
 import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
 import java.util.concurrent.CompletableFuture;
@@ -33,16 +32,16 @@ import java.util.concurrent.Future;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class AkkaActors<S, E> implements Actors<S, E> {
+public final class AkkaActors<R> implements Actors<R> {
 
-    private final ActorProgram actorProgram;
+    private final ActorProgram<R> actorProgram;
     private final ActorSystem system;
     private final Address.Master master;
 
-    public AkkaActors(final ActorProgram actorProgram, final Partitioner partitioner) {
+    public AkkaActors(final ActorProgram<R> actorProgram, final Partitioner partitioner)
{
         this.actorProgram = actorProgram;
         this.system = ActorSystem.create("traversal-" + actorProgram.hashCode());
-        this.master = new Address.Master(this.system.actorOf(Props.create(MasterTraversalActor.class,
this.actorProgram, partitioner), "master").path().toString());
+        this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class,
this.actorProgram, partitioner), "master").path().toString());
     }
 
     @Override
@@ -51,12 +50,12 @@ public final class AkkaActors<S, E> implements Actors<S, E>
{
     }
 
     @Override
-    public Future<TraverserSet<E>> submit() {
+    public Future<R> submit() {
         return CompletableFuture.supplyAsync(() -> {
             while (!this.system.isTerminated()) {
 
             }
-            return (TraverserSet) this.actorProgram.getResult();
+            return this.actorProgram.getResult();
         });
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
new file mode 100644
index 0000000..d7b45fa
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -0,0 +1,84 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.dispatch.RequiresMessageQueue;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MasterActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>,
Actor.Master {
+
+    private final Address.Master master;
+    private final List<Address.Worker> workers;
+    private final Map<Address, ActorSelection> actors = new HashMap<>();
+
+    public MasterActor(final ActorProgram program, final Partitioner partitioner) {
+        this.master = new Address.Master(self().path().toString());
+        this.workers = new ArrayList<>();
+        final List<Partition> partitions = partitioner.getPartitions();
+        for (final Partition partition : partitions) {
+            this.workers.add(new Address.Worker("worker-" + partition.hashCode()));
+            context().actorOf(Props.create(WorkerActor.class, program, partitioner, partition),
"worker-" + partition.hashCode());
+        }
+        final ActorProgram.Master masterProgram = program.createMasterProgram(this);
+        receive(ReceiveBuilder.matchAny(masterProgram::execute).build());
+        masterProgram.setup();
+    }
+
+    @Override
+    public <M> void send(final Address toActor, final M message) {
+        ActorSelection actor = this.actors.get(toActor);
+        if (null == actor) {
+            actor = context().actorSelection(toActor.location());
+            this.actors.put(toActor, actor);
+        }
+        actor.tell(message, self());
+    }
+
+    @Override
+    public List<Address.Worker> workers() {
+        return this.workers;
+    }
+
+    @Override
+    public Address.Master address() {
+        return this.master;
+    }
+
+    @Override
+    public void close() {
+        context().system().terminate();
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
deleted file mode 100644
index 6799a28..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.AbstractActor;
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-import akka.dispatch.RequiresMessageQueue;
-import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>,
Actor.Master {
-
-    private final Address.Master master;
-    private final List<Address.Worker> workers;
-    private final Map<Address, ActorSelection> actors = new HashMap<>();
-
-    public MasterTraversalActor(final ActorProgram program, final Partitioner partitioner)
{
-        this.master = new Address.Master(self().path().toString());
-        this.workers = new ArrayList<>();
-        final List<Partition> partitions = partitioner.getPartitions();
-        for (final Partition partition : partitions) {
-            this.workers.add(new Address.Worker("worker-" + partition.hashCode()));
-            context().actorOf(Props.create(WorkerTraversalActor.class, program, partitioner,
partition), "worker-" + partition.hashCode());
-        }
-        final ActorProgram.Master masterProgram = program.createMasterProgram(this);
-        receive(ReceiveBuilder.matchAny(masterProgram::execute).build());
-        masterProgram.setup();
-    }
-
-    @Override
-    public <M> void send(final Address toActor, final M message) {
-        ActorSelection actor = this.actors.get(toActor);
-        if (null == actor) {
-            actor = context().actorSelection(toActor.location());
-            this.actors.put(toActor, actor);
-        }
-        actor.tell(message, self());
-    }
-
-    @Override
-    public List<Address.Worker> workers() {
-        return this.workers;
-    }
-
-    @Override
-    public Address.Master address() {
-        return this.master;
-    }
-
-    @Override
-    public void close() {
-        context().system().terminate();
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
new file mode 100644
index 0000000..84dbe37
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -0,0 +1,91 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorSelection;
+import akka.dispatch.RequiresMessageQueue;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>,
Actor.Worker {
+
+    private final Partition localPartition;
+    private final Address.Worker self;
+    private final Address.Master master;
+    private final List<Address.Worker> workers;
+    private final Map<Address, ActorSelection> actors = new HashMap<>();
+
+    public WorkerActor(final ActorProgram program, final Partitioner partitioner, final Partition
localPartition) {
+        this.localPartition = localPartition;
+        this.self = new Address.Worker("../worker-" + localPartition.hashCode());
+        this.master = new Address.Master(context().parent().path().toString());
+        this.workers = new ArrayList<>();
+        for (final Partition partition : partitioner.getPartitions()) {
+            this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
+        }
+        ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
+        receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
+        workerProgram.setup();
+    }
+
+    @Override
+    public <M> void send(final Address toActor, final M message) {
+        ActorSelection actor = this.actors.get(toActor);
+        if (null == actor) {
+            actor = context().actorSelection(toActor.location());
+            this.actors.put(toActor, actor);
+        }
+        actor.tell(message, self());
+    }
+
+    @Override
+    public List<Address.Worker> workers() {
+        return this.workers;
+    }
+
+    @Override
+    public Partition partition() {
+        return this.localPartition;
+    }
+
+    @Override
+    public Address.Worker address() {
+        return this.self;
+    }
+
+    @Override
+    public Address.Master master() {
+        return this.master;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
deleted file mode 100644
index 5a6bae7..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.AbstractActor;
-import akka.actor.ActorSelection;
-import akka.dispatch.RequiresMessageQueue;
-import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>,
Actor.Worker {
-
-    private final Partition localPartition;
-    private final Address.Worker self;
-    private final Address.Master master;
-    private final List<Address.Worker> workers;
-    private final Map<Address, ActorSelection> actors = new HashMap<>();
-
-    public WorkerTraversalActor(final ActorProgram program, final Partitioner partitioner,
final Partition localPartition) {
-        this.localPartition = localPartition;
-        this.self = new Address.Worker(self().path().toString());
-        this.master = new Address.Master(context().parent().path().toString());
-        this.workers = new ArrayList<>();
-        for (final Partition partition : partitioner.getPartitions()) {
-            this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
-        }
-        ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
-        receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
-        workerProgram.setup();
-    }
-
-    @Override
-    public <M> void send(final Address toActor, final M message) {
-        ActorSelection actor = this.actors.get(toActor);
-        if (null == actor) {
-            actor = context().actorSelection(toActor.location());
-            this.actors.put(toActor, actor);
-        }
-        actor.tell(message, self());
-    }
-
-    @Override
-    public List<Address.Worker> workers() {
-        return this.workers;
-    }
-
-    @Override
-    public Partition partition() {
-        return this.localPartition;
-    }
-
-    @Override
-    public Address.Worker address() {
-        return this.self;
-    }
-
-    @Override
-    public Address.Master master() {
-        return this.master;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
index 2e410ec..7b0c4a4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
@@ -19,16 +19,14 @@
 
 package org.apache.tinkerpop.gremlin.process.actor;
 
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-
 import java.util.concurrent.Future;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Actors<S, E> {
+public interface Actors<R> {
 
     public Address.Master master();
 
-    public Future<TraverserSet<E>> submit();
+    public Future<R> submit();
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
index c598eb7..ff45e30 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
@@ -36,15 +36,22 @@ public abstract class Address implements Serializable {
         return this.location;
     }
 
+    @Override
     public boolean equals(final Object other) {
         return other instanceof Address && ((Address) other).location.equals(this.location);
     }
 
+    @Override
     public int hashCode() {
         return this.location.hashCode();
     }
 
-    public static class Master extends Address {
+    @Override
+    public String toString() {
+        return this.location();
+    }
+
+    public static final class Master extends Address {
 
         public Master(final String location) {
             super(location);
@@ -52,7 +59,7 @@ public abstract class Address implements Serializable {
 
     }
 
-    public static class Worker extends Address {
+    public static final class Worker extends Address {
 
         public Worker(final String location) {
             super(location);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
index 278fb3b..e72b989 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -33,24 +33,24 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class TraversalActorProgram<M> implements ActorProgram<M> {
+public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet<R>>
{
 
-    private final Traversal.Admin<?, ?> traversal;
+    private final Traversal.Admin<?, R> traversal;
     private final Partitioner partitioner;
-    public TraverserSet<?> result = new TraverserSet<>();
+    public TraverserSet<R> result = new TraverserSet<>();
 
-    public TraversalActorProgram(final Traversal.Admin<?, ?> traversal, final Partitioner
partitioner) {
+    public TraversalActorProgram(final Traversal.Admin<?, R> traversal, final Partitioner
partitioner) {
         this.partitioner = partitioner;
         final TraversalStrategies strategies = traversal.getStrategies().clone();
         strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class);
         strategies.addStrategies(ActorVerificationStrategy.instance());
         traversal.setStrategies(strategies);
         traversal.applyStrategies();
-        this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
+        this.traversal = (Traversal.Admin) ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
     }
 
     @Override
-    public Worker<M> createWorkerProgram(final Actor.Worker worker) {
+    public Worker createWorkerProgram(final Actor.Worker worker) {
         return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner);
     }
 
@@ -60,7 +60,7 @@ public final class TraversalActorProgram<M> implements ActorProgram<M>
{
     }
 
     @Override
-    public M getResult() {
-        return (M) this.result;
+    public TraverserSet<R> getResult() {
+        return this.result;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index 654969b..ba051e2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -48,32 +48,27 @@ import java.util.Map;
 public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
 
     private final Actor.Master master;
-    private final Map<String, Address.Worker> workers = new HashMap<>();
     private final Traversal.Admin<?, ?> traversal;
     private final TraversalMatrix<?, ?> matrix;
     private final Partitioner partitioner;
     private Map<String, Barrier> barriers = new HashMap<>();
     private final TraverserSet<?> results;
-    private final String leaderWorker;
+    private Address.Worker leaderWorker;
 
     public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?,
?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
         this.traversal = traversal;
-        System.out.println("master[created]: " + master.address().location());
-        System.out.println(this.traversal);
+        //System.out.println("master[created]: " + master.address().location());
+        //System.out.println(this.traversal);
         this.matrix = new TraversalMatrix<>(this.traversal);
         this.partitioner = partitioner;
         this.results = results;
         this.master = master;
-        this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
     }
 
     @Override
     public void setup() {
-        for (final Address.Worker worker : master.workers()) {
-            this.workers.put(worker.location(), worker);
-        }
+        this.leaderWorker = this.master.workers().get(0);
         this.broadcast(StartMessage.instance());
-
     }
 
     @Override
@@ -105,7 +100,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M>
{
                     }
                 }
                 this.barriers.clear();
-                this.master.send(this.workers.get(this.leaderWorker), StartMessage.instance());
+                this.master.send(this.leaderWorker, StartMessage.instance());
             } else {
                 while (this.traversal.hasNext()) {
                     this.results.add((Traverser.Admin) this.traversal.nextTraverser());
@@ -123,7 +118,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M>
{
     }
 
     private void broadcast(final Object message) {
-        for (final Address.Worker worker : this.workers.values()) {
+        for (final Address.Worker worker : this.master.workers()) {
             this.master.send(worker, message);
         }
     }
@@ -145,7 +140,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M>
{
         if (traverser.isHalted())
             this.results.add(traverser);
         else if (traverser.get() instanceof Element)
-            this.master.send(this.workers.get("worker-" + this.partitioner.getPartition((Element)
traverser.get()).hashCode()), traverser);
+            this.master.send(this.master.workers().get(this.partitioner.getPartitions().indexOf(this.partitioner.getPartition((Element)
traverser.get()))), traverser);
         else
             this.master.send(this.master.address(), traverser);
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index 58e06d6..4275caa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -61,8 +61,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
     private final Partition localPartition;
     private final Partitioner partitioner;
     //
-    private final Map<String, Address.Worker> workers = new HashMap<>();
-    private final String neighborWorker;
+    private Address.Worker neighborWorker;
     private boolean isLeader;
     private Terminate terminate = null;
     private boolean voteToHalt = false;
@@ -70,10 +69,10 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
 
     public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?>
traversal, final Partitioner partitioner) {
         this.self = self;
-        System.out.println("worker[created]: " + this.self.address().location());
+        // System.out.println("worker[created]: " + this.self.address().location());
         // set up partition and traversal information
-        this.localPartition = self.partition();
         this.partitioner = partitioner;
+        this.localPartition = self.partition();
         final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(),
this.self);
         TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects),
traversal);
         this.matrix = new TraversalMatrix<>(traversal);
@@ -88,19 +87,14 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
                 ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
                         () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()),
this.localPartition::contains));
         }
-        // create termination ring topology
-        final int i = this.partitioner.getPartitions().indexOf(this.localPartition);
-        this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size()
- 1 ? 0 : i + 1).hashCode();
-        this.isLeader = i == 0;
-        for (final Address.Worker worker : self.workers()) {
-            //if (!worker.equals(this.self.address()))
-            this.workers.put(worker.location(), worker);
-        }
     }
 
     @Override
     public void setup() {
-
+        // create termination ring topology
+        final int i = this.self.workers().indexOf(this.self.address());
+        this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1
? 0 : i + 1);
+        this.isLeader = i == 0;
     }
 
     @Override
@@ -113,7 +107,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
                 this.sendTraverser(step.next());
             }
             // internal vote to have in mailbox as final message to process
-            // assert null == this.terminate;
+            assert null == this.terminate;
             if (this.isLeader) {
                 this.terminate = Terminate.MAYBE;
                 this.self.send(this.self.address(), VoteToHaltMessage.instance());
@@ -125,7 +119,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
         } else if (message instanceof SideEffectSetMessage) {
             this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(),
((SideEffectSetMessage) message).getValue());
         } else if (message instanceof Terminate) {
-            // assert this.isLeader || this.terminate != Terminate.MAYBE;
+            assert this.isLeader || this.terminate != Terminate.MAYBE;
             this.terminate = (Terminate) message;
             this.self.send(this.self.address(), VoteToHaltMessage.instance());
         } else if (message instanceof VoteToHaltMessage) {
@@ -145,9 +139,9 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
                     if (this.voteToHalt && Terminate.YES == this.terminate)
                         this.self.send(this.self.master(), VoteToHaltMessage.instance());
                     else
-                        this.self.send(this.workers.get(this.neighborWorker), Terminate.YES);
+                        this.self.send(this.neighborWorker, Terminate.YES);
                 } else
-                    this.self.send(this.workers.get(this.neighborWorker), this.voteToHalt
? this.terminate : Terminate.NO);
+                    this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate
: Terminate.NO);
                 this.terminate = null;
                 this.voteToHalt = true;
             }
@@ -169,7 +163,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
     //////////////
 
     private void processTraverser(final Traverser.Admin traverser) {
-        // assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element)
traverser.get());
+        assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element)
traverser.get());
         final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
         if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
         GraphComputing.atMaster(step, false);
@@ -188,7 +182,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M>
{
         if (traverser.isHalted())
             this.self.send(this.self.master(), traverser);
         else if (traverser.get() instanceof Element && !this.localPartition.contains((Element)
traverser.get()))
-            this.self.send(this.workers.get("../worker-" + this.partitioner.getPartition((Element)
traverser.get()).hashCode()), traverser);
+            this.self.send(this.self.workers().get(this.partitioner.getPartitions().indexOf(this.partitioner.getPartition((Element)
traverser.get()))), traverser);
         else
             this.self.send(this.self.address(), traverser);
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/47bee430/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
index 77be06b..207dd57 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
@@ -70,7 +71,7 @@ public final class ActorStep<S, E> extends AbstractStep<E, E>
{
         if (this.first) {
             this.first = false;
             try {
-                final Actors<S, E> actors = this.actorsClass.getConstructor(ActorProgram.class,
Partitioner.class).newInstance(new TraversalActorProgram(this.partitionTraversal, partitioner),
this.partitioner);
+                final Actors<TraverserSet<E>> actors = this.actorsClass.getConstructor(ActorProgram.class,
Partitioner.class).newInstance(new TraversalActorProgram<E>(this.partitionTraversal,
partitioner), this.partitioner);
                 actors.submit().get().forEach(this.starts::add);
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);


Mime
View raw message