Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F2302200C10 for ; Thu, 19 Jan 2017 18:40:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F11E8160B3A; Thu, 19 Jan 2017 17:40:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CEA6A160B63 for ; Thu, 19 Jan 2017 18:40:10 +0100 (CET) Received: (qmail 64323 invoked by uid 500); 19 Jan 2017 17:40:08 -0000 Mailing-List: contact commits-help@tinkerpop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.apache.org Delivered-To: mailing list commits@tinkerpop.apache.org Received: (qmail 63598 invoked by uid 99); 19 Jan 2017 17:40:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 17:40:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9B277F402C; Thu, 19 Jan 2017 17:40:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: okram@apache.org To: commits@tinkerpop.apache.org Date: Thu, 19 Jan 2017 17:40:17 -0000 Message-Id: <9bc438278c944ca69a0f9bf84c0d5763@git.apache.org> In-Reply-To: <1564f448f51d410eb30f7141d1173c87@git.apache.org> References: <1564f448f51d410eb30f7141d1173c87@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/50] [abbrv] tinkerpop git commit: added Pushing and Distributing which are currently the two constructs of GraphComputing. GraphComputing is now deprecated with its corresponding methods calling the respective Pushing/Distributing methods. Pushing me archived-at: Thu, 19 Jan 2017 17:40:14 -0000 added Pushing and Distributing which are currently the two constructs of GraphComputing. GraphComputing is now deprecated with its corresponding methods calling the respective Pushing/Distributing methods. Pushing means the step is using push-based semantics. Distributing means that the step must be aware of whether it is executing in a master or worker traversal. Came up with a much cleaner way to configure these that doesn't rely on using VerificationStrategies and step-by-step setting. In short, once on a master or worker machine, configure yourself accordingly. Duh. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b5b72867 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b5b72867 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b5b72867 Branch: refs/heads/TINKERPOP-1564 Commit: b5b7286749e28b1b6151bb481de6970c7ef512e4 Parents: f41c69e Author: Marko A. Rodriguez Authored: Wed Dec 14 08:16:08 2016 -0700 Committer: Marko A. Rodriguez Committed: Thu Jan 19 10:26:57 2017 -0700 ---------------------------------------------------------------------- .../gremlin/akka/process/AkkaPlayTest.java | 10 +++- .../actor/traversal/TraversalMasterProgram.java | 8 +-- .../actor/traversal/TraversalWorkerProgram.java | 6 +- .../verification/ActorVerificationStrategy.java | 8 --- .../process/traversal/step/Bypassing.java | 3 + .../process/traversal/step/Distributing.java | 58 ++++++++++++++++++++ .../process/traversal/step/GraphComputing.java | 5 ++ .../gremlin/process/traversal/step/Pushing.java | 56 +++++++++++++++++++ .../traversal/step/filter/DedupGlobalStep.java | 42 +++++++++----- .../traversal/step/filter/RangeGlobalStep.java | 16 ++++-- .../traversal/step/filter/TailGlobalStep.java | 15 +++-- .../process/traversal/step/map/GraphStep.java | 22 +++++--- .../step/sideEffect/ProfileSideEffectStep.java | 18 ++++-- .../traversal/step/util/ComputerAwareStep.java | 20 +++++-- 14 files changed, 232 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java index 3874d06..6338706 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java @@ -28,6 +28,8 @@ import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; import org.junit.Test; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE; /** @@ -40,7 +42,13 @@ public class AkkaPlayTest { final Graph graph = TinkerGraph.open(); graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); GraphTraversalSource g = graph.traversal().withStrategies(new ActorProgramStrategy(AkkaGraphActors.class, new HashPartitioner(graph.partitioner(), 3))); - System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); + // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); + + for (int i = 0; i < 10000; i++) { + if(12l != g.V().union(out(), in()).values("name").count().next()) + System.out.println(i); + } + //3, 1.9, 1 /*for (int i = 0; i < 10000; i++) { final Graph graph = TinkerGraph.open(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java index 723339d..7846d53 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java @@ -32,11 +32,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep; import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; @@ -70,6 +70,8 @@ final class TraversalMasterProgram implements ActorProgram.Master { this.partitioner = partitioner; this.results = results; this.master = master; + Distributing.configure(this.traversal, true, true); + Pushing.configure(this.traversal, true, false); } @Override @@ -86,7 +88,6 @@ final class TraversalMasterProgram implements ActorProgram.Master { } else if (message instanceof BarrierAddMessage) { final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId()); final Step step = (Step) barrier; - GraphComputing.atMaster(step, true); barrier.addBarrier(((BarrierAddMessage) message).getBarrier()); this.barriers.put(step.getId(), barrier); } else if (message instanceof SideEffectAddMessage) { @@ -144,7 +145,6 @@ final class TraversalMasterProgram implements ActorProgram.Master { this.sendTraverser(traverser); } else { final Step step = this.matrix.>getStepById(traverser.getStepId()); - GraphComputing.atMaster(step, true); step.addStart(traverser); if (step instanceof Barrier) { this.barriers.put(step.getId(), (Barrier) step); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java index 08d2cff..dc03b7d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java @@ -32,7 +32,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; @@ -72,6 +74,9 @@ final class TraversalWorkerProgram implements ActorProgram.Worker { final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self); TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); this.matrix = new TraversalMatrix<>(traversal); + Distributing.configure(traversal, false, true); + Pushing.configure(traversal, true, false); + ////// final GraphStep graphStep = (GraphStep) traversal.getStartStep(); if (0 == graphStep.getIds().length) ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges); @@ -148,7 +153,6 @@ final class TraversalWorkerProgram implements ActorProgram.Worker { assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get()); final Step step = this.matrix.>getStepById(traverser.getStepId()); if (step instanceof Bypassing) ((Bypassing) step).setBypass(true); - GraphComputing.atMaster(step, false); step.addStart(traverser); if (step instanceof Barrier) { this.barriers.put(step.getId(), (Barrier) step); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java index 30ea2c5..f6e93ef 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java @@ -47,14 +47,6 @@ public final class ActorVerificationStrategy extends AbstractTraversalStrategy traversal) { if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty()) throw new VerificationException("Inject traversal currently not supported", traversal); - - - final boolean globalChild = TraversalHelper.isGlobalChild(traversal); - for (final Step step : traversal.getSteps()) { - // only global children are graph computing - if (globalChild && step instanceof GraphComputing) - ((GraphComputing) step).onGraphComputer(); - } } public static ActorVerificationStrategy instance() { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java index 2e67ff7..a9f3a82 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java @@ -23,8 +23,11 @@ package org.apache.tinkerpop.gremlin.process.traversal.step; * This is useful in for steps that need to dynamically change their behavior on {@link org.apache.tinkerpop.gremlin.process.computer.GraphComputer}. * * @author Marko A. Rodriguez (http://markorodriguez.com) + * @deprecated As of release 3.3.0, replaced by {@link Distributing} */ +@Deprecated public interface Bypassing { + @Deprecated public void setBypass(final boolean bypass); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Distributing.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Distributing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Distributing.java new file mode 100644 index 0000000..42e1b44 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Distributing.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.traversal.step; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface Distributing { + + /** + * Some steps should behave different whether they are executing at the master traversal or distributed across the worker traversals. + * + * @param atMaster whether the step is currently executing at master + */ + public void setAtMaster(final boolean atMaster); + + /** + * This static method will walk recursively traversal and set all {@link Distributing#setAtMaster(boolean)} accordingly. + * + * @param traversal the traversal to recursively walk with the assumption that the provided traversal is a global traversal. + * @param globalMasterDistributing whether global traversals should be treated as being at a master or worker step. + * @param localMasterDistribution whether local traversals should be treated as being at a master or worker step. + */ + public static void configure(final Traversal.Admin traversal, final boolean globalMasterDistributing, final boolean localMasterDistribution) { + for (final Step step : traversal.getSteps()) { + if (step instanceof Distributing) + ((Distributing) step).setAtMaster(globalMasterDistributing); + if (step instanceof TraversalParent) { + for (final Traversal.Admin global : ((TraversalParent) step).getGlobalChildren()) { + Distributing.configure(global, globalMasterDistributing, localMasterDistribution); + } + for (final Traversal.Admin local : ((TraversalParent) step).getLocalChildren()) { + Distributing.configure(local, localMasterDistribution, localMasterDistribution); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java index cec5708..8e2b3b3 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java @@ -27,12 +27,15 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; * This method is only called for global children steps of a {@link TraversalParent}. * * @author Marko A. Rodriguez (http://markorodriguez.com) + * @deprecated As of release 3.3.0, replaced by {@link Pushing} and {@link Distributing} */ +@Deprecated public interface GraphComputing { /** * The step will be executing on a {@link org.apache.tinkerpop.gremlin.process.computer.GraphComputer}. */ + @Deprecated public void onGraphComputer(); /** @@ -41,10 +44,12 @@ public interface GraphComputing { * * @param atMaster whether the step is currently executing at master */ + @Deprecated public default void atMaster(boolean atMaster) { } + @Deprecated public static void atMaster(final Step step, boolean atMaster) { if (step instanceof GraphComputing) ((GraphComputing) step).atMaster(atMaster); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Pushing.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Pushing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Pushing.java new file mode 100644 index 0000000..8d8ca9d --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Pushing.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.traversal.step; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface Pushing { + + /** + * Set whether this step will be executing using a push-based model or the standard pull-based iterator model. + */ + public void setPushBased(final boolean pushBased); + + /** + * This static method will walk recursively traversal and set all {@link Pushing#setPushBased(boolean)} accordingly. + * + * @param traversal the traversal to recursively walk with the assumption that the provided traversal is a global traversal. + * @param globalPushBased the traverser propagation semantics (push or pull) of global children (typically push). + * @param localPushBased the traverser propagation semantics (push or pull) of local children (typically pull). + */ + public static void configure(final Traversal.Admin traversal, final boolean globalPushBased, final boolean localPushBased) { + for (final Step step : traversal.getSteps()) { + if (step instanceof Pushing) + ((Pushing) step).setPushBased(globalPushBased); + if (step instanceof TraversalParent) { + for (final Traversal.Admin global : ((TraversalParent) step).getGlobalChildren()) { + Pushing.configure(global, globalPushBased, localPushBased); + } + for (final Traversal.Admin local : ((TraversalParent) step).getLocalChildren()) { + Pushing.configure(local, localPushBased, localPushBased); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java index 96bd0be..8446540 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java @@ -25,8 +25,10 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating; +import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor; +import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; import org.apache.tinkerpop.gremlin.process.traversal.step.Scoping; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; @@ -50,16 +52,16 @@ import java.util.function.BinaryOperator; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class DedupGlobalStep extends FilterStep implements TraversalParent, Scoping, GraphComputing, Barrier>>, ByModulating, PathProcessor { +public final class DedupGlobalStep extends FilterStep implements TraversalParent, Scoping, Barrier>>, ByModulating, PathProcessor, Distributing, Pushing, GraphComputing { private Traversal.Admin dedupTraversal = null; private Set duplicateSet = new HashSet<>(); - private boolean onGraphComputer = false; private final Set dedupLabels; private Set keepLabels; - private boolean executingAtMaster = false; private Map> barrier; private Iterator>> barrierIterator; + private boolean atWorker = true; + private boolean pushBased = false; public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) { super(traversal); @@ -68,8 +70,8 @@ public final class DedupGlobalStep extends FilterStep implements Traversal @Override protected boolean filter(final Traverser.Admin traverser) { - if (this.onGraphComputer && !this.executingAtMaster) return true; - traverser.setBulk(1L); + if (this.pushBased && this.atWorker) return true; + traverser.setBulk(1); if (null == this.dedupLabels) { return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal)); } else { @@ -80,11 +82,6 @@ public final class DedupGlobalStep extends FilterStep implements Traversal } @Override - public void atMaster(final boolean atMaster) { - this.executingAtMaster = atMaster; - } - - @Override public ElementRequirement getMaxRequirement() { return null == this.dedupLabels ? ElementRequirement.ID : PathProcessor.super.getMaxRequirement(); } @@ -161,11 +158,6 @@ public final class DedupGlobalStep extends FilterStep implements Traversal } @Override - public void onGraphComputer() { - this.onGraphComputer = true; - } - - @Override public Set getScopeKeys() { return null == this.dedupLabels ? Collections.emptySet() : this.dedupLabels; } @@ -231,4 +223,24 @@ public final class DedupGlobalStep extends FilterStep implements Traversal public Set getKeepLabels() { return this.keepLabels; } + + @Override + public void setAtMaster(final boolean atMaster) { + this.atWorker = !atMaster; + } + + @Override + public void setPushBased(final boolean pushBased) { + this.pushBased = pushBased; + } + + @Override + public void onGraphComputer() { + this.setPushBased(true); + } + + @Override + public void atMaster(final boolean atMaster) { + this.setAtMaster(atMaster); + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java index 9700870..c361321 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java @@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; import org.apache.tinkerpop.gremlin.process.traversal.step.Ranging; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; @@ -41,12 +42,12 @@ import java.util.function.BinaryOperator; * @author Bob Briody (http://bobbriody.com) * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class RangeGlobalStep extends FilterStep implements Ranging, Bypassing, Barrier> { +public final class RangeGlobalStep extends FilterStep implements Ranging, Bypassing, Barrier>, Distributing { private long low; private final long high; private AtomicLong counter = new AtomicLong(0l); - private boolean bypass; + private boolean atMaster = true; public RangeGlobalStep(final Traversal.Admin traversal, final long low, final long high) { super(traversal); @@ -59,7 +60,7 @@ public final class RangeGlobalStep extends FilterStep implements Ranging, @Override protected boolean filter(final Traverser.Admin traverser) { - if (this.bypass) return true; + if (!this.atMaster) return true; if (this.high != -1 && this.counter.get() >= this.high) { throw FastNoSuchElementException.instance(); @@ -146,7 +147,7 @@ public final class RangeGlobalStep extends FilterStep implements Ranging, @Override public void setBypass(final boolean bypass) { - this.bypass = bypass; + this.setAtMaster(!bypass); } @Override @@ -161,7 +162,7 @@ public final class RangeGlobalStep extends FilterStep implements Ranging, @Override public TraverserSet nextBarrier() throws NoSuchElementException { - if(!this.starts.hasNext()) + if (!this.starts.hasNext()) throw FastNoSuchElementException.instance(); final TraverserSet barrier = new TraverserSet<>(); while (this.starts.hasNext()) { @@ -178,6 +179,11 @@ public final class RangeGlobalStep extends FilterStep implements Ranging, }); } + @Override + public void setAtMaster(final boolean atMaster) { + this.atMaster = atMaster; + } + //////////////// public static final class RangeBiOperator implements BinaryOperator>, Serializable { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java index 2e31b1f..cc31e24 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java @@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; @@ -39,12 +40,12 @@ import java.util.Set; /** * @author Matt Frantz (http://github.com/mhfrantz) */ -public final class TailGlobalStep extends AbstractStep implements Bypassing, Barrier> { +public final class TailGlobalStep extends AbstractStep implements Bypassing, Distributing, Barrier> { private final long limit; private Deque> tail; private long tailBulk = 0L; - private boolean bypass = false; + private boolean atWorker = false; public TailGlobalStep(final Traversal.Admin traversal, final long limit) { super(traversal); @@ -52,13 +53,14 @@ public final class TailGlobalStep extends AbstractStep implements Bypas this.tail = new ArrayDeque<>((int) limit); } + @Override public void setBypass(final boolean bypass) { - this.bypass = bypass; + this.setAtMaster(!bypass); } @Override public Traverser.Admin processNextStart() { - if (this.bypass) { + if (this.atWorker) { // If we are bypassing this step, let everything through. return this.starts.next(); } else { @@ -160,4 +162,9 @@ public final class TailGlobalStep extends AbstractStep implements Bypas this.addStart(traverser); }); } + + @Override + public void setAtMaster(final boolean atMaster) { + this.atWorker = !atMaster; + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java index 87935d8..70f7925 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java @@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer; import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException; @@ -48,7 +49,7 @@ import java.util.function.Supplier; * @author Marko A. Rodriguez (http://markorodriguez.com) * @author Pieter Martin */ -public class GraphStep extends AbstractStep implements GraphComputing, AutoCloseable { +public class GraphStep extends AbstractStep implements GraphComputing, Pushing, AutoCloseable { protected final Class returnClass; protected Object[] ids; @@ -112,12 +113,6 @@ public class GraphStep extends AbstractStep implemen this.ids = new Object[0]; } - @Override - public void onGraphComputer() { - this.iteratorSupplier = Collections::emptyIterator; - convertElementsToIds(); - } - public void convertElementsToIds() { for (int i = 0; i < this.ids.length; i++) { // if this is going to OLAP, convert to ids so you don't serialize elements if (this.ids[i] instanceof Element) @@ -189,4 +184,17 @@ public class GraphStep extends AbstractStep implemen } return false; } + + @Override + public void onGraphComputer() { + this.setPushBased(true); + } + + @Override + public void setPushBased(final boolean pushBased) { + if (pushBased) { + this.iteratorSupplier = Collections::emptyIterator; + convertElementsToIds(); + } + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java index be60808..f165738 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java @@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Operator; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable; import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalMetrics; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -32,11 +33,11 @@ import java.util.function.Supplier; /** * @author Bob Briody (http://bobbriody.com) */ -public final class ProfileSideEffectStep extends SideEffectStep implements SideEffectCapable, GraphComputing { +public final class ProfileSideEffectStep extends SideEffectStep implements SideEffectCapable, GraphComputing, Pushing { public static final String DEFAULT_METRICS_KEY = Graph.Hidden.hide("metrics"); private String sideEffectKey; - private boolean onGraphComputer = false; + private boolean pushBased = false; public ProfileSideEffectStep(final Traversal.Admin traversal, final String sideEffectKey) { super(traversal); @@ -60,7 +61,7 @@ public final class ProfileSideEffectStep extends SideEffectStep implements start = super.next(); return start; } finally { - if (!this.onGraphComputer && start == null) { + if (!this.pushBased && start == null) { ((DefaultTraversalMetrics) this.getTraversal().getSideEffects().get(this.sideEffectKey)).setMetrics(this.getTraversal(), false); } } @@ -69,7 +70,7 @@ public final class ProfileSideEffectStep extends SideEffectStep implements @Override public boolean hasNext() { boolean start = super.hasNext(); - if (!this.onGraphComputer && !start) { + if (!this.pushBased && !start) { ((DefaultTraversalMetrics) this.getTraversal().getSideEffects().get(this.sideEffectKey)).setMetrics(this.getTraversal(), false); } return start; @@ -77,13 +78,18 @@ public final class ProfileSideEffectStep extends SideEffectStep implements @Override public DefaultTraversalMetrics generateFinalResult(final DefaultTraversalMetrics tm) { - if (this.onGraphComputer) + if (this.pushBased) tm.setMetrics(this.getTraversal(), true); return tm; } @Override public void onGraphComputer() { - onGraphComputer = true; + this.setPushBased(true); + } + + @Override + public void setPushBased(boolean pushBased) { + this.pushBased = pushBased; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b5b72867/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java index 5acff58..4adbfa2 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.util; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator; @@ -30,7 +31,7 @@ import java.util.NoSuchElementException; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public abstract class ComputerAwareStep extends AbstractStep implements GraphComputing { +public abstract class ComputerAwareStep extends AbstractStep implements GraphComputing, Pushing { private Iterator> previousIterator = EmptyIterator.instance(); @@ -49,7 +50,12 @@ public abstract class ComputerAwareStep extends AbstractStep impleme @Override public void onGraphComputer() { - this.traverserStepIdAndLabelsSetByChild = true; + this.setPushBased(true); + } + + @Override + public void setPushBased(final boolean pushBased) { + this.traverserStepIdAndLabelsSetByChild = pushBased; } @Override @@ -63,9 +69,10 @@ public abstract class ComputerAwareStep extends AbstractStep impleme protected abstract Iterator> computerAlgorithm() throws NoSuchElementException; + ////// - public static class EndStep extends AbstractStep implements GraphComputing { + public static class EndStep extends AbstractStep implements GraphComputing, Pushing { public EndStep(final Traversal.Admin traversal) { super(traversal); @@ -88,7 +95,12 @@ public abstract class ComputerAwareStep extends AbstractStep impleme @Override public void onGraphComputer() { - this.traverserStepIdAndLabelsSetByChild = true; + this.setPushBased(true); + } + + @Override + public void setPushBased(final boolean pushBased) { + this.traverserStepIdAndLabelsSetByChild = pushBased; } }