Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 12A4C200BDA for ; Tue, 13 Dec 2016 13:52:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 11259160B23; Tue, 13 Dec 2016 12:52:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3ACCE160B15 for ; Tue, 13 Dec 2016 13:52:13 +0100 (CET) Received: (qmail 73599 invoked by uid 500); 13 Dec 2016 12:52:12 -0000 Mailing-List: contact commits-help@tinkerpop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.apache.org Delivered-To: mailing list commits@tinkerpop.apache.org Received: (qmail 73590 invoked by uid 99); 13 Dec 2016 12:52:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Dec 2016 12:52:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52BECE3934; Tue, 13 Dec 2016 12:52:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: okram@apache.org To: commits@tinkerpop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tinkerpop git commit: started on the process.actor interfaces in gremlin-core. Really cool as the naming/model lines up nicely with process.computer interfaces. ActorStep and ActorStrategy are now generalized to work with Actors interface and now live in Date: Tue, 13 Dec 2016 12:52:12 +0000 (UTC) archived-at: Tue, 13 Dec 2016 12:52:15 -0000 Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 23c83e9b0 -> 07347b8df started on the process.actor interfaces in gremlin-core. Really cool as the naming/model lines up nicely with process.computer interfaces. ActorStep and ActorStrategy are now generalized to work with Actors interface and now live in gremlin-core. Pretty easy. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/07347b8d Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/07347b8d Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/07347b8d Branch: refs/heads/TINKERPOP-1564 Commit: 07347b8dfbeaa1a2358cc3e50b04bc2b2b5c2c9d Parents: 23c83e9 Author: Marko A. Rodriguez Authored: Tue Dec 13 05:52:06 2016 -0700 Committer: Marko A. Rodriguez Committed: Tue Dec 13 05:52:06 2016 -0700 ---------------------------------------------------------------------- .../gremlin/akka/process/actor/AkkaActors.java | 6 +- .../process/actor/MasterTraversalActor.java | 104 ++++++++++--------- .../process/actor/WorkerTraversalActor.java | 67 +++++++----- .../actor/message/SideEffectAddMessage.java | 18 ++-- .../actor/message/SideEffectSetMessage.java | 42 ++++++++ .../process/traversal/step/map/ActorStep.java | 76 -------------- .../strategy/decoration/ActorStrategy.java | 83 --------------- .../verification/ActorVerificationStrategy.java | 60 ----------- .../akka/process/AkkaActorsProvider.java | 5 +- .../gremlin/akka/process/AkkaPlayTest.java | 5 +- .../tinkerpop/gremlin/process/actor/Actor.java | 31 ++++++ .../tinkerpop/gremlin/process/actor/Actors.java | 32 ++++++ .../gremlin/process/actor/MasterActor.java | 35 +++++++ .../gremlin/process/actor/WorkerActor.java | 35 +++++++ .../actor/traversal/step/map/ActorStep.java | 80 ++++++++++++++ .../strategy/decoration/ActorStrategy.java | 86 +++++++++++++++ .../verification/ActorVerificationStrategy.java | 60 +++++++++++ 17 files changed, 516 insertions(+), 309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java index aa2a048..8ef96bb 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.akka.process.actor; import akka.actor.ActorSystem; import akka.actor.Props; +import org.apache.tinkerpop.gremlin.process.actor.Actors; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.structure.Partitioner; @@ -31,7 +32,7 @@ import java.util.concurrent.Future; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class AkkaActors { +public final class AkkaActors implements Actors { public final ActorSystem system; private TraverserSet results = new TraverserSet<>(); @@ -41,7 +42,8 @@ public final class AkkaActors { this.system.actorOf(Props.create(MasterTraversalActor.class, traversal.clone(), partitioner, this.results), "master"); } - public Future> getResults() { + @Override + public Future> submit() { return CompletableFuture.supplyAsync(() -> { while (!this.system.isTerminated()) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java index d1a29d8..5009554 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java @@ -28,9 +28,11 @@ import akka.japi.pf.ReceiveBuilder; import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage; import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage; import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage; import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage; import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage; -import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.verification.ActorVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.actor.MasterActor; import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; @@ -54,7 +56,7 @@ import java.util.Map; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue { +public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue, MasterActor { private final Traversal.Admin traversal; private final TraversalMatrix matrix; @@ -81,47 +83,11 @@ public final class MasterTraversalActor extends AbstractActor implements Require this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode(); receive(ReceiveBuilder. - match(Traverser.Admin.class, traverser -> { - this.processTraverser(traverser); - }). - match(BarrierAddMessage.class, barrierMerge -> { - // get the barrier updates from the workers to synchronize against the master barrier - final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId()); - final Step step = (Step) barrier; - GraphComputing.atMaster(step, true); - barrier.addBarrier(barrierMerge.getBarrier()); - this.barriers.put(step.getId(), barrier); - }). - match(SideEffectAddMessage.class, sideEffect -> { - // get the side-effect updates from the workers to generate the master side-effects - this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue()); - }). - match(VoteToHaltMessage.class, voteToHalt -> { - assert !sender().equals(self()); - if (!this.barriers.isEmpty()) { - for (final Barrier barrier : this.barriers.values()) { - final Step step = (Step) barrier; - if (!(barrier instanceof LocalBarrier)) { - while (step.hasNext()) { - this.sendTraverser(step.next()); - } - } else { - this.traversal.getSideEffects().forEach((k, v) -> { - this.broadcast(new SideEffectAddMessage(k, v)); - }); - this.broadcast(new BarrierDoneMessage(barrier)); - barrier.done(); - } - } - this.barriers.clear(); - worker(this.leaderWorker).tell(StartMessage.instance(), self()); - } else { - while (this.traversal.hasNext()) { - this.results.add((Traverser.Admin) this.traversal.nextTraverser()); - } - context().system().terminate(); - } - }).build()); + match(Traverser.Admin.class, this::processTraverser). + match(BarrierAddMessage.class, barrierMerge -> this.processBarrierAdd((Barrier) this.matrix.getStepById(barrierMerge.getStepId()), barrierMerge.getBarrier())). + match(SideEffectAddMessage.class, sideEffect -> this.processSideEffectAdd(((SideEffectAddMessage) sideEffect).getKey(), ((SideEffectAddMessage) sideEffect).getValue())). + match(VoteToHaltMessage.class, voteToHalt -> this.processVoteToHalt()). + build()); } private void initializeWorkers() { @@ -137,13 +103,49 @@ public final class MasterTraversalActor extends AbstractActor implements Require this.workers.clear(); } - private void broadcast(final Object message) { - for (final Partition partition : this.partitioner.getPartitions()) { - worker("worker-" + partition.hashCode()).tell(message, self()); + @Override + public void processBarrierAdd(final Barrier barrier, final Object barrierAddition) { + final Step step = (Step) barrier; + GraphComputing.atMaster(step, true); + barrier.addBarrier(barrierAddition); + this.barriers.put(step.getId(), barrier); + } + + @Override + public void processSideEffectAdd(final String key, final Object value) { + this.traversal.getSideEffects().add(key, value); + } + + @Override + public void processVoteToHalt() { + assert !sender().equals(self()); + if (!this.barriers.isEmpty()) { + for (final Barrier barrier : this.barriers.values()) { + final Step step = (Step) barrier; + if (!(barrier instanceof LocalBarrier)) { + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + } else { + this.traversal.getSideEffects().forEach((k, v) -> { + this.broadcast(new SideEffectSetMessage(k, v)); + }); + this.broadcast(new BarrierDoneMessage(barrier)); + barrier.done(); + } + } + this.barriers.clear(); + worker(this.leaderWorker).tell(StartMessage.instance(), self()); + } else { + while (this.traversal.hasNext()) { + this.results.add((Traverser.Admin) this.traversal.nextTraverser()); + } + context().system().terminate(); } } - private void processTraverser(final Traverser.Admin traverser) { + @Override + public void processTraverser(final Traverser.Admin traverser) { if (traverser.isHalted() || traverser.get() instanceof Element) { this.sendTraverser(traverser); } else { @@ -156,6 +158,14 @@ public final class MasterTraversalActor extends AbstractActor implements Require } } + //////////////// + + private void broadcast(final Object message) { + for (final Partition partition : this.partitioner.getPartitions()) { + worker("worker-" + partition.hashCode()).tell(message, self()); + } + } + private void sendTraverser(final Traverser.Admin traverser) { if (traverser.isHalted()) this.results.add(traverser); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java index 63eb707..862b6b3 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java @@ -26,9 +26,10 @@ import akka.dispatch.RequiresMessageQueue; import akka.japi.pf.ReceiveBuilder; import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage; import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage; import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage; import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage; +import org.apache.tinkerpop.gremlin.process.actor.WorkerActor; import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; @@ -51,7 +52,7 @@ import java.util.Map; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue { +public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue, WorkerActor { // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm) public enum Terminate { @@ -94,36 +95,15 @@ public final class WorkerTraversalActor extends AbstractActor implements Require this.isLeader = i == 0; receive(ReceiveBuilder. - match(StartMessage.class, start -> { - // initial message from master that says: "start processing" - final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep(); - while (step.hasNext()) { - this.sendTraverser(step.next()); - } - // internal vote to have in mailbox as final message to process - assert null == this.terminate; - if (this.isLeader) { - this.terminate = Terminate.MAYBE; - self().tell(VoteToHaltMessage.instance(), self()); - } - }). - match(Traverser.Admin.class, traverser -> { - this.processTraverser(traverser); - }). - match(SideEffectAddMessage.class, sideEffect -> { - this.matrix.getTraversal().getSideEffects().set(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue()); - }). + match(StartMessage.class, start -> this.processStart()). + match(Traverser.Admin.class, this::processTraverser). + match(SideEffectSetMessage.class, sideEffect -> this.processSideEffectSet(sideEffect.getKey(), sideEffect.getValue())). + match(BarrierDoneMessage.class, barrierDone -> this.processBarrierDone(this.matrix.getStepById(barrierDone.getStepId()))). match(Terminate.class, terminate -> { assert this.isLeader || this.terminate != Terminate.MAYBE; this.terminate = terminate; self().tell(VoteToHaltMessage.instance(), self()); }). - match(BarrierDoneMessage.class, barrierDone -> { - final Step step = this.matrix.>getStepById(barrierDone.getStepId()); - while (step.hasNext()) { - sendTraverser(step.next()); - } - }). match(VoteToHaltMessage.class, haltSync -> { // if there is a barrier and thus, halting at barrier, then process barrier if (!this.barriers.isEmpty()) { @@ -151,7 +131,23 @@ public final class WorkerTraversalActor extends AbstractActor implements Require ); } - private void processTraverser(final Traverser.Admin traverser) { + @Override + public void processStart() { + // initial message from master that says: "start processing" + final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep(); + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + // internal vote to have in mailbox as final message to process + assert null == this.terminate; + if (this.isLeader) { + this.terminate = Terminate.MAYBE; + self().tell(VoteToHaltMessage.instance(), self()); + } + } + + @Override + public void processTraverser(final Traverser.Admin traverser) { assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get()); final Step step = this.matrix.>getStepById(traverser.getStepId()); if (step instanceof Bypassing) ((Bypassing) step).setBypass(true); @@ -166,6 +162,21 @@ public final class WorkerTraversalActor extends AbstractActor implements Require } } + @Override + public void processBarrierDone(final Barrier barrier) { + final Step step = (Step) barrier; + while (step.hasNext()) { + sendTraverser(step.next()); + } + } + + @Override + public void processSideEffectSet(final String key, final Object value) { + this.matrix.getTraversal().getSideEffects().set(key, value); + } + + ////////////////////// + private void sendTraverser(final Traverser.Admin traverser) { this.voteToHalt = false; if (traverser.isHalted()) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java index 4a54d97..2d97bfa 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java @@ -24,20 +24,20 @@ package org.apache.tinkerpop.gremlin.akka.process.actor.message; */ public final class SideEffectAddMessage { - private final String sideEffectKey; - private final Object sideEffect; + private final String key; + private final Object value; - public SideEffectAddMessage(final String sideEffectKey, final Object sideEffect) { - this.sideEffect = sideEffect; - this.sideEffectKey = sideEffectKey; + public SideEffectAddMessage(final String key, final Object value) { + this.value = value; + this.key = key; } - public String getSideEffectKey() { - return this.sideEffectKey; + public String getKey() { + return this.key; } - public Object getSideEffectValue() { - return this.sideEffect; + public Object getValue() { + return this.value; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java new file mode 100644 index 0000000..023133b --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor.message; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class SideEffectSetMessage { + + private final String key; + private final Object value; + + public SideEffectSetMessage(final String key, final Object value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return this.key; + } + + public Object getValue() { + return this.value; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java deleted file mode 100644 index db41493..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/step/map/ActorStep.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.traversal.step.map; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ - -import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaActors; -import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy; -import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; -import org.apache.tinkerpop.gremlin.structure.Partitioner; -import org.apache.tinkerpop.gremlin.structure.util.StringFactory; - -import java.util.NoSuchElementException; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ActorStep extends AbstractStep { - - public final Traversal.Admin partitionTraversal; - private final Partitioner partitioner; - private boolean first = true; - - public ActorStep(final Traversal.Admin traversal, final Partitioner partitioner) { - super(traversal); - this.partitionTraversal = (Traversal.Admin) traversal.clone(); - final TraversalStrategies strategies = this.partitionTraversal.getStrategies().clone(); - strategies.removeStrategies(ActorStrategy.class); - strategies.addStrategies(VertexProgramStrategy.instance()); - this.partitionTraversal.setStrategies(strategies); - this.partitioner = partitioner; - } - - @Override - public String toString() { - return StringFactory.stepString(this, this.partitionTraversal); - } - - @Override - protected Traverser.Admin processNextStart() throws NoSuchElementException { - if (this.first) { - this.first = false; - final AkkaActors actors = new AkkaActors<>(this.partitionTraversal, this.partitioner); - try { - actors.getResults().get().forEach(this.starts::add); - } catch (final Exception e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - return this.starts.next(); - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java deleted file mode 100644 index adbc257..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/decoration/ActorStrategy.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration; - -import org.apache.tinkerpop.gremlin.akka.process.traversal.step.map.ActorStep; -import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; -import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; -import org.apache.tinkerpop.gremlin.structure.Partitioner; - -import java.util.Collections; -import java.util.Set; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ActorStrategy extends AbstractTraversalStrategy - implements TraversalStrategy.DecorationStrategy { - - - private static final Set> PRIORS = Collections.singleton(RemoteStrategy.class); - private static final Set> POSTS = Collections.singleton(VertexProgramStrategy.class); - - private final Partitioner partitioner; - - public ActorStrategy(final Partitioner partitioner) { - this.partitioner = partitioner; - } - - @Override - public void apply(final Traversal.Admin traversal) { - ReadOnlyStrategy.instance().apply(traversal); - if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty()) - throw new VerificationException("Inject traversal currently not supported", traversal); - - if (!(traversal.getParent() instanceof EmptyStep)) - return; - - final ActorStep actorStep = new ActorStep<>(traversal, this.partitioner); - TraversalHelper.removeAllSteps(traversal); - traversal.addStep(actorStep); - - // validations - assert traversal.getStartStep().equals(actorStep); - assert traversal.getSteps().size() == 1; - assert traversal.getEndStep() == actorStep; - } - - @Override - public Set> applyPost() { - return POSTS; - } - - @Override - public Set> applyPrior() { - return PRIORS; - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java deleted file mode 100644 index a9e3f7b..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/traversal/strategy/verification/ActorVerificationStrategy.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.verification; - -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; -import org.apache.tinkerpop.gremlin.structure.Graph; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ActorVerificationStrategy extends AbstractTraversalStrategy implements TraversalStrategy.VerificationStrategy { - - private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy(); - - private ActorVerificationStrategy() { - } - - @Override - public void apply(final Traversal.Admin traversal) { - if (!TraversalHelper.onGraphComputer(traversal)) - return; - final boolean globalChild = TraversalHelper.isGlobalChild(traversal); - for (final Step step : traversal.getSteps()) { - // only global children are graph computing - if (globalChild && step instanceof GraphComputing) - ((GraphComputing) step).onGraphComputer(); - - for (String label : step.getLabels()) { - if (Graph.Hidden.isHidden(label)) - step.removeLabel(label); - } - } - } - - public static ActorVerificationStrategy instance() { - return INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java index 9158040..19e0628 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java @@ -22,7 +22,8 @@ package org.apache.tinkerpop.gremlin.akka.process; import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.AbstractGraphProvider; import org.apache.tinkerpop.gremlin.LoadGraphWith; -import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy; +import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaActors; +import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorStrategy; import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest; @@ -134,7 +135,7 @@ public class AkkaActorsProvider extends AbstractGraphProvider { //throw new VerificationException("This test current does not work with Gremlin-Python", EmptyTraversal.instance()); else { final GraphTraversalSource g = graph.traversal(); - return g.withStrategies(new ActorStrategy(new HashPartitioner(graph.partitioner(), 3))); + return g.withStrategies(new ActorStrategy(AkkaActors.class, new HashPartitioner(graph.partitioner(), 3))); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java index b064331..bed7636 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java @@ -19,7 +19,8 @@ package org.apache.tinkerpop.gremlin.akka.process; -import org.apache.tinkerpop.gremlin.akka.process.traversal.strategy.decoration.ActorStrategy; +import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaActors; +import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorStrategy; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -40,7 +41,7 @@ public class AkkaPlayTest { public void testPlay1() throws Exception { final Graph graph = TinkerGraph.open(); graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); - GraphTraversalSource g = graph.traversal().withStrategies(new ActorStrategy(new HashPartitioner(graph.partitioner(), 3))); + GraphTraversalSource g = graph.traversal().withStrategies(new ActorStrategy(AkkaActors.class, new HashPartitioner(graph.partitioner(), 3))); System.out.println(g.V(1, 2).union(outE().count(), inE().count(), (Traversal) outE().values("weight").sum()).toList()); //3, 1.9, 1 /*for (int i = 0; i < 10000; i++) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java new file mode 100644 index 0000000..aa2f429 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor; + +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface Actor { + + public void processTraverser(final Traverser.Admin traverser); + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java new file mode 100644 index 0000000..d9e257e --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor; + +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; + +import java.util.concurrent.Future; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface Actors { + + public Future> submit(); +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java new file mode 100644 index 0000000..87efe51 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor; + +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface MasterActor extends Actor { + + public void processBarrierAdd(final Barrier barrier, final V barrierAddition); + + public void processSideEffectAdd(final String key, final V value); + + public void processVoteToHalt(); + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java new file mode 100644 index 0000000..6d4ca64 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor; + +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface WorkerActor extends Actor { + + public void processStart(); + + public void processBarrierDone(final Barrier barrier); + + public void processSideEffectSet(final String key, final Object value); + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java new file mode 100644 index 0000000..e86810e --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.traversal.step.map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ + +import org.apache.tinkerpop.gremlin.process.actor.Actors; +import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorStrategy; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; +import org.apache.tinkerpop.gremlin.structure.Partitioner; +import org.apache.tinkerpop.gremlin.structure.util.StringFactory; + +import java.util.NoSuchElementException; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorStep extends AbstractStep { + + private final Class actorsClass; + private final Traversal.Admin partitionTraversal; + private final Partitioner partitioner; + + private boolean first = true; + + + public ActorStep(final Traversal.Admin traversal, final Class actorsClass, final Partitioner partitioner) { + super(traversal); + this.actorsClass = actorsClass; + this.partitionTraversal = (Traversal.Admin) traversal.clone(); + final TraversalStrategies strategies = this.partitionTraversal.getStrategies().clone(); + strategies.removeStrategies(ActorStrategy.class); + strategies.addStrategies(VertexProgramStrategy.instance()); + this.partitionTraversal.setStrategies(strategies); + this.partitioner = partitioner; + } + + @Override + public String toString() { + return StringFactory.stepString(this, this.partitionTraversal); + } + + @Override + protected Traverser.Admin processNextStart() throws NoSuchElementException { + if (this.first) { + this.first = false; + try { + final Actors actors = this.actorsClass.getConstructor(Traversal.Admin.class, Partitioner.class).newInstance(this.partitionTraversal, this.partitioner); + actors.submit().get().forEach(this.starts::add); + } catch (final Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + return this.starts.next(); + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorStrategy.java new file mode 100644 index 0000000..ca9f64c --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorStrategy.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration; + +import org.apache.tinkerpop.gremlin.process.actor.Actors; +import org.apache.tinkerpop.gremlin.process.actor.traversal.step.map.ActorStep; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; +import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.Collections; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorStrategy extends AbstractTraversalStrategy + implements TraversalStrategy.DecorationStrategy { + + + private static final Set> PRIORS = Collections.singleton(RemoteStrategy.class); + private static final Set> POSTS = Collections.singleton(VertexProgramStrategy.class); + + private final Partitioner partitioner; + private final Class actors; + + public ActorStrategy(final Class actors, final Partitioner partitioner) { + this.actors = actors; + this.partitioner = partitioner; + } + + @Override + public void apply(final Traversal.Admin traversal) { + ReadOnlyStrategy.instance().apply(traversal); + if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty()) + throw new VerificationException("Inject traversal currently not supported", traversal); + + if (!(traversal.getParent() instanceof EmptyStep)) + return; + + final ActorStep actorStep = new ActorStep<>(traversal, this.actors, this.partitioner); + TraversalHelper.removeAllSteps(traversal); + traversal.addStep(actorStep); + + // validations + assert traversal.getStartStep().equals(actorStep); + assert traversal.getSteps().size() == 1; + assert traversal.getEndStep() == actorStep; + } + + @Override + public Set> applyPost() { + return POSTS; + } + + @Override + public Set> applyPrior() { + return PRIORS; + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/07347b8d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java new file mode 100644 index 0000000..6e0b410 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.structure.Graph; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorVerificationStrategy extends AbstractTraversalStrategy implements TraversalStrategy.VerificationStrategy { + + private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy(); + + private ActorVerificationStrategy() { + } + + @Override + public void apply(final Traversal.Admin traversal) { + if (!TraversalHelper.onGraphComputer(traversal)) + return; + final boolean globalChild = TraversalHelper.isGlobalChild(traversal); + for (final Step step : traversal.getSteps()) { + // only global children are graph computing + if (globalChild && step instanceof GraphComputing) + ((GraphComputing) step).onGraphComputer(); + + for (String label : step.getLabels()) { + if (Graph.Hidden.isHidden(label)) + step.removeLabel(label); + } + } + } + + public static ActorVerificationStrategy instance() { + return INSTANCE; + } +}