tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [31/50] [abbrv] tinkerpop git commit: what a day. So I have Akka remoting working (not fully). We now have akka.io.GryoSerializer :). And TraversalActorProgram is smart about detach() and attach(). Learned a bunch about how to do Partitioner/Partitions i
Date Thu, 19 Jan 2017 17:40:37 GMT
what a day. So I have Akka remoting working (not fully). We now have akka.io.GryoSerializer
:). And TraversalActorProgram is smart about detach() and attach(). Learned a bunch about
how to do Partitioner/Partitions in TinkerPop.... stuff is coming along nicely. Need a break
though...been coding for 4 hours straight.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0e7b6ae1
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0e7b6ae1
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0e7b6ae1

Branch: refs/heads/TINKERPOP-1564
Commit: 0e7b6ae16c58f61b823a4cc02073a141d84b97bd
Parents: 63f5c0f
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed Jan 11 14:38:13 2017 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Thu Jan 19 10:27:16 2017 -0700

----------------------------------------------------------------------
 akka-gremlin/pom.xml                            | 11 +++
 .../akka/process/actors/AkkaGraphActors.java    |  8 +-
 .../akka/process/actors/MasterActor.java        |  6 +-
 .../akka/process/actors/io/GryoSerializer.java  | 90 ++++++++++++++++++++
 .../src/main/resources/application.conf         | 40 +++++++--
 .../akka/process/actors/AkkaPlayTest.java       | 11 +--
 .../gremlin/process/actors/ActorProgram.java    | 14 +++
 .../actors/traversal/TraversalActorProgram.java |  6 +-
 .../traversal/TraversalMasterProgram.java       | 15 +++-
 .../traversal/TraversalWorkerProgram.java       | 37 +++++---
 .../traversal/message/BarrierAddMessage.java    |  8 +-
 .../traversal/message/SideEffectAddMessage.java |  8 +-
 .../tinkerpop/gremlin/structure/Partition.java  |  4 +-
 .../gremlin/structure/util/Attachable.java      | 37 +++++++-
 .../util/partitioner/GlobalPartitioner.java     | 13 ++-
 .../util/config/SerializableConfiguration.java  | 77 +++++++++++++++++
 16 files changed, 347 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/akka-gremlin/pom.xml b/akka-gremlin/pom.xml
index f88ec7d..daebdfe 100644
--- a/akka-gremlin/pom.xml
+++ b/akka-gremlin/pom.xml
@@ -47,6 +47,17 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-remote_2.11</artifactId>
+            <version>2.4.14</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>2.11.8</version>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 3bd5fa6..acc06ff 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -37,11 +37,11 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
+import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -57,8 +57,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R>
{
     private boolean executed = false;
 
     private AkkaGraphActors(final Configuration configuration) {
-        this.configuration = new BaseConfiguration();
-        ConfigurationUtils.copy(configuration, this.configuration);
+        this.configuration = new SerializableConfiguration(configuration);
         this.configuration.setProperty(GRAPH_ACTORS, AkkaGraphActors.class.getCanonicalName());
         GraphActorsHelper.configure(this, this.configuration);
     }
@@ -98,7 +97,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R>
{
                         stream().
                         map(Class::getCanonicalName).
                         collect(Collectors.toList()).toString()));
-        final ActorSystem system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
+        final ActorSystem system = ActorSystem.create("traversal", config);
         final ActorsResult<R> result = new DefaultActorsResult<>();
         final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(),
this.workers);
         try {
@@ -126,5 +125,6 @@ public final class AkkaGraphActors<R> implements GraphActors<R>
{
     public static AkkaGraphActors open() {
         return new AkkaGraphActors(new BaseConfiguration());
     }
+
 }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
index 97951a8..b9c30bf 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
@@ -21,9 +21,12 @@ package org.apache.tinkerpop.gremlin.akka.process.actors;
 
 import akka.actor.AbstractActor;
 import akka.actor.ActorSelection;
+import akka.actor.AddressFromURIString;
+import akka.actor.Deploy;
 import akka.actor.Props;
 import akka.dispatch.RequiresMessageQueue;
 import akka.japi.pf.ReceiveBuilder;
+import akka.remote.RemoteScope;
 import org.apache.tinkerpop.gremlin.process.actors.Actor;
 import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
@@ -61,9 +64,10 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
         this.workers = new ArrayList<>();
         final List<Partition> partitions = partitioner.getPartitions();
         for (final Partition partition : partitions) {
+            akka.actor.Address addr = AddressFromURIString.parse("akka.tcp://traversal@127.0.0.1:2552");
             final String workerPathString = "worker-" + partition.id();
             this.workers.add(new Address.Worker(workerPathString, partition.location()));
-            context().actorOf(Props.create(WorkerActor.class, program, this.master, partition,
partitioner), workerPathString);
+            context().actorOf(Props.create(WorkerActor.class, program, this.master, partition,
partitioner).withDeploy(new Deploy(new RemoteScope(addr))), workerPathString);
         }
         this.masterProgram = program.createMasterProgram(this);
         receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
new file mode 100644
index 0000000..ab2b16a
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
@@ -0,0 +1,90 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors.io;
+
+import akka.serialization.Serializer;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.Option;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializer implements Serializer {
+
+    private final GryoPool gryoPool;
+
+    public GryoSerializer() {
+        this.gryoPool = GryoPool.build().
+                poolSize(100).
+                initializeMapper(builder ->
+                        builder.referenceTracking(true).
+                                registrationRequired(true).
+                                addCustom(
+                                        StartMessage.class,
+                                        BarrierAddMessage.class,
+                                        SideEffectAddMessage.class)).create();
+    }
+
+    @Override
+    public int identifier() {
+        return 0;
+    }
+
+    @Override
+    public byte[] toBinary(final Object object) {
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        final Output output = new Output(outputStream);
+        this.gryoPool.writeWithKryo(kryo -> kryo.writeClassAndObject(output, object));
+        output.flush();
+        return outputStream.toByteArray();
+    }
+
+    @Override
+    public boolean includeManifest() {
+        return true;
+    }
+
+    @Override
+    public Object fromBinary(byte[] bytes, Option<Class<?>> option) {
+        return option.isEmpty() ? this.fromBinary(bytes) : this.fromBinary(bytes, option.get());
+    }
+
+    @Override
+    public Object fromBinary(byte[] bytes) {
+        final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+        final Input input = new Input(inputStream);
+        return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input));
+    }
+
+    @Override
+    public Object fromBinary(byte[] bytes, Class<?> aClass) {
+        final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+        final Input input = new Input(inputStream);
+        return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); //
todo: be smart about just reading object
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index 7ee599a..393881a 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -1,15 +1,43 @@
+akka {
+  log-dead-letters-during-shutdown = "false"
+}
+
+custom-dispatcher-mailbox {
+  mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox"
+}
+
 custom-dispatcher {
   mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics"
 }
 
-akka.actor.mailbox.requirements {
-  "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox
+akka.actor {
+  provider = local
+  serialize-messages = off
+  serializers {
+    gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer"
+  }
+  serialization-bindings {
+    "org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage" = gryo
+    "org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage" = gryo
+    "org.apache.tinkerpop.gremlin.process.actors.traversal.TraversalActorProgram" = gryo
+  }
+  mailbox.requirements {
+    "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox
+  }
 }
 
-akka {
-  log-dead-letters-during-shutdown = "false"
+akka.remote {
+  enabled-transports = ["akka.remote.netty.tcp"]
+  netty.tcp {
+    hostname = "127.0.0.1"
+    port = 2552
+  }
 }
 
-custom-dispatcher-mailbox {
-  mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox"
+akka.cluster {
+  seed-nodes = [
+    "akka.tcp://traversal@127.0.0.1:2551",
+    "akka.tcp://traversal@127.0.0.1:2552"]
+
+  auto-down-unreachable-after = 10s
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
index d4562eb..c95f336 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors;
 import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 import org.junit.Ignore;
@@ -30,6 +31,7 @@ import org.junit.Test;
 
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -42,12 +44,11 @@ public class AkkaPlayTest {
         final Graph graph = TinkerGraph.open();
         graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
         GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3));
-        // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
+     //  System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
+
+        System.out.println(g.V().groupCount().by(T.label).toList());
+
 
-        for (int i = 0; i < 1000; i++) {
-            if (12l != g.V().union(out(), in()).values("name").count().next())
-                System.out.println(i);
-        }
 
         //3, 1.9, 1
         /*for (int i = 0; i < 10000; i++) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
index b1e3065..e3713ad 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
@@ -22,6 +22,7 @@ package org.apache.tinkerpop.gremlin.process.actors;
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 
+import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.Optional;
 
@@ -99,6 +100,19 @@ public interface ActorProgram extends Cloneable {
     @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
     public ActorProgram clone();
 
+    public static <A extends ActorProgram> A createActorProgram(final Graph graph,
final Configuration configuration) {
+        try {
+            final Class<A> actorProgramClass = (Class) Class.forName(configuration.getString(ACTOR_PROGRAM));
+            final Constructor<A> constructor = actorProgramClass.getDeclaredConstructor();
+            constructor.setAccessible(true);
+            final A actorProgram = constructor.newInstance();
+            actorProgram.loadState(graph, configuration);
+            return actorProgram;
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
     /**
      * The Worker program is executed by a worker process in the {@link GraphActors} system.
      * There are many workers and a single master.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
index 484b904..c97ffd7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -44,7 +44,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.Repe
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -53,7 +55,7 @@ import java.util.Optional;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class TraversalActorProgram<R> implements ActorProgram {
+public final class TraversalActorProgram<R> implements ActorProgram, Serializable {
 
     public static final String TRAVERSAL_ACTOR_PROGRAM_BYTECODE = "gremlin.traversalActorProgram.bytecode";
 
@@ -68,9 +70,11 @@ public final class TraversalActorProgram<R> implements ActorProgram
{
 
     private Traversal.Admin<?, R> traversal;
     public TraverserSet<R> result = new TraverserSet<>();
+    private Configuration configuration;
 
     public TraversalActorProgram(final Traversal.Admin<?, R> traversal) {
         this.traversal = traversal;
+        this.configuration = new SerializableConfiguration(configuration);
         final TraversalStrategies strategies = this.traversal.getStrategies().clone();
         strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance());
         // TODO: make TinkerGraph/etc. strategies smart about actors

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index e447cdb..796e4c1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -44,6 +44,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -144,6 +145,7 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object>
{
     }
 
     private void processTraverser(final Traverser.Admin traverser) {
+        this.attachTraverser(traverser);
         if (traverser.isHalted() || traverser.get() instanceof Element) {
             this.sendTraverser(traverser);
         } else {
@@ -163,9 +165,9 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object>
{
         if (traverser.isHalted())
             this.results.add(traverser);
         else if (traverser.get() instanceof Element)
-            this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element)
traverser.get())), traverser);
+            this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element)
traverser.get())), this.detachTraverser(traverser));
         else
-            this.master.send(this.master.address(), traverser);
+            this.master.send(this.master.address(), this.detachTraverser(traverser));
     }
 
     private void orderBarrier(final Step step) {
@@ -176,4 +178,13 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object>
{
             barrier.addBarrier(rangingBarrier);
         }
     }
+
+    private final Traverser.Admin detachTraverser(final Traverser.Admin traverser) {
+        return true ? traverser : traverser.detach();
+    }
+
+    private void attachTraverser(final Traverser.Admin traverser) {
+        if (false && traverser.get() instanceof Element)
+            traverser.attach(Attachable.Method.get(this.master.partitioner().getPartition((Element)
traverser.get())));
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index 127322f..fa5645d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.HashMap;
@@ -99,10 +100,10 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object>
{
         //System.out.println(message + "::" + this.isLeader);
         if (message instanceof StartMessage) {
             // initial message from master that says: "start processing"
-            final GraphStep<?,?> step = (GraphStep) this.matrix.getTraversal().getStartStep();
+            final GraphStep<?, ?> step = (GraphStep) this.matrix.getTraversal().getStartStep();
             while (step.hasNext()) {
                 final Traverser.Admin<? extends Element> traverser = step.next();
-                this.self.send(traverser.isHalted() ? this.self.master() : this.self.address(),
traverser);
+                this.self.send(traverser.isHalted() ? this.self.master() : this.self.address(),
this.detachTraverser(traverser));
             }
         } else if (message instanceof Traverser.Admin) {
             this.processTraverser((Traverser.Admin) message);
@@ -147,20 +148,26 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object>
{
     //////////////
 
     private void processTraverser(final Traverser.Admin traverser) {
-        assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element)
traverser.get());
-        final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
-        step.addStart(traverser);
-        if (step instanceof Barrier) {
-            this.barriers.put(step.getId(), (Barrier) step);
-        } else {
-            while (step.hasNext()) {
-                this.sendTraverser(step.next());
+        assert !(traverser.get() instanceof Element) || this.self.partition().contains((Element)
traverser.get());
+        if (traverser.isHalted())
+            this.sendTraverser(traverser);
+        else {
+            this.attachTraverser(traverser);
+            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object,
Object>>getStepById(traverser.getStepId());
+            step.addStart(traverser);
+            if (step instanceof Barrier) {
+                this.barriers.put(step.getId(), (Barrier) step);
+            } else {
+                while (step.hasNext()) {
+                    this.sendTraverser(step.next());
+                }
             }
         }
     }
 
     private void sendTraverser(final Traverser.Admin traverser) {
         this.voteToHalt = false;
+        this.detachTraverser(traverser);
         if (traverser.isHalted())
             this.self.send(this.self.master(), traverser);
         else if (traverser.get() instanceof Element && !this.self.partition().contains((Element)
traverser.get()))
@@ -168,4 +175,14 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object>
{
         else
             this.self.send(this.self.address(), traverser);
     }
+
+    private final Traverser.Admin detachTraverser(final Traverser.Admin traverser) {
+        return true ? traverser : traverser.detach();
+    }
+
+    private final Traverser.Admin attachTraverser(final Traverser.Admin traverser) {
+        if (false)
+            traverser.attach(Attachable.Method.get(this.self.partition()));
+        return traverser;
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
index ac4c61d..ade6796 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
@@ -27,8 +27,12 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
  */
 public final class BarrierAddMessage {
 
-    private final Object barrier;
-    private final String stepId;
+    private Object barrier;
+    private  String stepId;
+
+    private BarrierAddMessage() {
+        // for serialization
+    }
 
     public BarrierAddMessage(final Barrier barrier) {
         this.barrier = barrier.nextBarrier();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
index 1c0a9de..bcc3223 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
@@ -24,8 +24,12 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
  */
 public final class SideEffectAddMessage {
 
-    private final String key;
-    private final Object value;
+    private String key;
+    private Object value;
+
+    private SideEffectAddMessage() {
+        // for serialization
+    }
 
     public SideEffectAddMessage(final String key, final Object value) {
         this.value = value;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
index f20b9fb..49389f1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
@@ -19,6 +19,8 @@
 
 package org.apache.tinkerpop.gremlin.structure;
 
+import org.apache.tinkerpop.gremlin.structure.util.Host;
+
 import java.net.InetAddress;
 import java.net.URI;
 import java.util.Iterator;
@@ -32,7 +34,7 @@ import java.util.UUID;
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Partition {
+public interface Partition extends Host {
 
     /**
      * Whether or not this element was, is, or will be contained in this partition.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
index fa999aa..f748ee6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -75,21 +76,27 @@ public interface Attachable<V> {
                 if (base instanceof Vertex) {
                     final Optional<Vertex> optional = hostVertexOrGraph instanceof
Graph ?
                             Method.getVertex((Attachable<Vertex>) attachable, (Graph)
hostVertexOrGraph) :
-                            Method.getVertex((Attachable<Vertex>) attachable, (Vertex)
hostVertexOrGraph);
+                            hostVertexOrGraph instanceof Vertex ?
+                                    Method.getVertex((Attachable<Vertex>) attachable,
(Vertex) hostVertexOrGraph) :
+                                    Method.getVertex((Attachable<Vertex>) attachable,
(Partition) hostVertexOrGraph);
                     return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof
Graph ?
                             Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable,
(Graph) hostVertexOrGraph) :
                             Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable,
(Vertex) hostVertexOrGraph));
                 } else if (base instanceof Edge) {
                     final Optional<Edge> optional = hostVertexOrGraph instanceof Graph
?
                             Method.getEdge((Attachable<Edge>) attachable, (Graph) hostVertexOrGraph)
:
-                            Method.getEdge((Attachable<Edge>) attachable, (Vertex)
hostVertexOrGraph);
+                            hostVertexOrGraph instanceof Vertex ?
+                                    Method.getEdge((Attachable<Edge>) attachable, (Vertex)
hostVertexOrGraph) :
+                                    Method.getEdge((Attachable<Edge>) attachable, (Partition)
hostVertexOrGraph);
                     return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof
Graph ?
                             Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable,
(Graph) hostVertexOrGraph) :
                             Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable,
(Vertex) hostVertexOrGraph));
                 } else if (base instanceof VertexProperty) {
                     final Optional<VertexProperty> optional = hostVertexOrGraph instanceof
Graph ?
                             Method.getVertexProperty((Attachable<VertexProperty>) attachable,
(Graph) hostVertexOrGraph) :
-                            Method.getVertexProperty((Attachable<VertexProperty>) attachable,
(Vertex) hostVertexOrGraph);
+                            hostVertexOrGraph instanceof Vertex ?
+                                    Method.getVertexProperty((Attachable<VertexProperty>)
attachable, (Vertex) hostVertexOrGraph) :
+                                    Method.getVertexProperty((Attachable<VertexProperty>)
attachable, (Partition) hostVertexOrGraph);
                     return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof
Graph ?
                             Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable,
(Graph) hostVertexOrGraph) :
                             Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable,
(Vertex) hostVertexOrGraph));
@@ -178,6 +185,11 @@ public interface Attachable<V> {
             return ElementHelper.areEqual(attachableVertex.get(), hostVertex) ? Optional.of(hostVertex)
: Optional.empty();
         }
 
+        public static Optional<Vertex> getVertex(final Attachable<Vertex> attachableVertex,
final Partition hostPartition) {
+            final Iterator<Vertex> iterator = hostPartition.vertices(attachableVertex.get().id());
+            return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty();
+        }
+
         public static Optional<Edge> getEdge(final Attachable<Edge> attachableEdge,
final Graph hostGraph) {
             final Iterator<Edge> edgeIterator = hostGraph.edges(attachableEdge.get().id());
             return edgeIterator.hasNext() ? Optional.of(edgeIterator.next()) : Optional.empty();
@@ -194,6 +206,11 @@ public interface Attachable<V> {
             return Optional.empty();
         }
 
+        public static Optional<Edge> getEdge(final Attachable<Edge> attachableEdge,
final Partition hostPartition) {
+            final Iterator<Edge> iterator = hostPartition.edges(attachableEdge.get().id());
+            return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty();
+        }
+
         public static Optional<VertexProperty> getVertexProperty(final Attachable<VertexProperty>
attachableVertexProperty, final Graph hostGraph) {
             final VertexProperty baseVertexProperty = attachableVertexProperty.get();
             final Iterator<Vertex> vertexIterator = hostGraph.vertices(baseVertexProperty.element().id());
@@ -219,6 +236,20 @@ public interface Attachable<V> {
             return Optional.empty();
         }
 
+        public static Optional<VertexProperty> getVertexProperty(final Attachable<VertexProperty>
attachableVertexProperty, final Partition hostPartition) {
+            final VertexProperty baseVertexProperty = attachableVertexProperty.get();
+            final Iterator<Vertex> vertexIterator= hostPartition.vertices(baseVertexProperty.element().id());
+            if (vertexIterator.hasNext()) {
+                final Iterator<VertexProperty<Object>> vertexPropertyIterator
= vertexIterator.next().properties(baseVertexProperty.key());
+                while (vertexPropertyIterator.hasNext()) {
+                    final VertexProperty vertexProperty = vertexPropertyIterator.next();
+                    if (ElementHelper.areEqual(vertexProperty, baseVertexProperty))
+                        return Optional.of(vertexProperty);
+                }
+            }
+            return Optional.empty();
+        }
+
         public static Optional<Property> getProperty(final Attachable<Property>
attachableProperty, final Graph hostGraph) {
             final Property baseProperty = attachableProperty.get();
             final Element propertyElement = attachableProperty.get().element();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
index 4d9f565..397c113 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
@@ -19,19 +19,24 @@
 
 package org.apache.tinkerpop.gremlin.structure.util.partitioner;
 
+import org.apache.commons.configuration.MapConfiguration;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
+import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -61,12 +66,14 @@ public final class GlobalPartitioner implements Partitioner {
 
     private class GlobalPartition implements Partition {
 
-        private final Graph graph;
+        private transient Graph graph;
+        private final Map<String, Object> configuration = new HashMap<>();
         private final String id;
         private final InetAddress location;
 
         private GlobalPartition(final Graph graph) {
             this.graph = graph;
+            graph.configuration().getKeys().forEachRemaining(key -> configuration.put(key,
graph.configuration().getProperty(key)));
             this.id = "global-" + graph.getClass().getSimpleName().toLowerCase();
             try {
                 this.location = InetAddress.getLocalHost();
@@ -82,11 +89,15 @@ public final class GlobalPartitioner implements Partitioner {
 
         @Override
         public Iterator<Vertex> vertices(final Object... ids) {
+            if(null == this.graph)
+                this.graph = GraphFactory.open(new MapConfiguration(this.configuration));
             return this.graph.vertices(ids);
         }
 
         @Override
         public Iterator<Edge> edges(final Object... ids) {
+            if(null == this.graph)
+                this.graph = GraphFactory.open(new MapConfiguration(this.configuration));
             return this.graph.edges(ids);
         }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0e7b6ae1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java
new file mode 100644
index 0000000..2a1eac1
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java
@@ -0,0 +1,77 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.util.config;
+
+import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SerializableConfiguration extends AbstractConfiguration implements Serializable
{
+
+    private final Map<String, Object> properties = new HashMap<>();
+
+    public SerializableConfiguration() {
+        super();
+        super.setDelimiterParsingDisabled(true);
+    }
+
+    public SerializableConfiguration(final Configuration configuration) {
+        this();
+        this.copy(configuration);
+    }
+
+    @Override
+    protected void addPropertyDirect(final String key, final Object value) {
+        this.properties.put(key, value);
+    }
+
+    @Override
+    protected void clearPropertyDirect(final String key) {
+        this.properties.remove(key);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return this.properties.isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(final String key) {
+        return this.properties.containsKey(key);
+    }
+
+    @Override
+    public Object getProperty(final String key) {
+        return this.properties.get(key);
+    }
+
+    @Override
+    public Iterator<String> getKeys() {
+        return this.properties.keySet().iterator();
+    }
+
+}


Mime
View raw message