Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A506B200CB6 for ; Thu, 29 Jun 2017 20:55:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A44E5160BC6; Thu, 29 Jun 2017 18:55:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EEB3D160C09 for ; Thu, 29 Jun 2017 20:55:04 +0200 (CEST) Received: (qmail 47744 invoked by uid 500); 29 Jun 2017 18:55:04 -0000 Mailing-List: contact commits-help@tinkerpop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.apache.org Delivered-To: mailing list commits@tinkerpop.apache.org Received: (qmail 47634 invoked by uid 99); 29 Jun 2017 18:55:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Jun 2017 18:55:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E8C94E5E44; Thu, 29 Jun 2017 18:55:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: spmallette@apache.org To: commits@tinkerpop.apache.org Date: Thu, 29 Jun 2017 18:55:13 -0000 Message-Id: <074dfb58652f45439e42eca170384cb2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/29] tinkerpop git commit: Merge branch 'tp31' into tp32 archived-at: Thu, 29 Jun 2017 18:55:07 -0000 Merge branch 'tp31' into tp32 Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/94365853 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/94365853 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/94365853 Branch: refs/heads/TINKERPOP-1603 Commit: 943658531ba8a3bb8050c169ad2180f81cda96db Parents: f1aed80 e7f5ef7 Author: Marko A. Rodriguez Authored: Tue Jun 27 10:04:37 2017 -0600 Committer: Marko A. Rodriguez Committed: Tue Jun 27 10:04:37 2017 -0600 ---------------------------------------------------------------------- .../tinkerpop/gremlin/process/computer/GraphComputerTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/94365853/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java ---------------------------------------------------------------------- diff --cc gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java index e4b40e8,e2acd5e..380887b --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java @@@ -73,9 -52,7 +73,8 @@@ import static org.junit.Assert.assertEq import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeNoException; - /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ @@@ -1591,7 -1472,7 +1590,7 @@@ public class GraphComputerTest extends @Test @LoadGraphWith(MODERN) public void shouldSupportMultipleScopes() throws ExecutionException, InterruptedException { - final ComputerResult result = graph.compute().program(new MultiScopeVertexProgram()).submit().get(); - final ComputerResult result = graph.compute(graphComputerClass.get()).program(new MultiScopeVertexProgram()).submit().get(); ++ final ComputerResult result = graphProvider.getGraphComputer(graph).program(new MultiScopeVertexProgram()).submit().get(); assertEquals(result.graph().traversal().V().has("name", "josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 0L); assertEquals(result.graph().traversal().V().has("name", "lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L); assertEquals(result.graph().traversal().V().has("name", "ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L); @@@ -1653,968 -1535,6 +1652,965 @@@ public GraphComputer.ResultGraph getPreferredResultGraph() { return GraphComputer.ResultGraph.NEW; } - + } + + ///////////////////////////////////////////// + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportGraphFilter() throws Exception { + // if the graph computer does not support graph filter, then make sure its exception handling is correct + if (!graphProvider.getGraphComputer(graph).features().supportsGraphFilter()) { + try { + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")); + fail("Should throw an unsupported operation exception"); + } catch (final UnsupportedOperationException e) { + assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage()); + } + try { + graphProvider.getGraphComputer(graph).edges(__.outE().limit(10)); + fail("Should throw an unsupported operation exception"); + } catch (final UnsupportedOperationException e) { + assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage()); + } + return; + } + /// VERTEX PROGRAM + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get(); + + /// VERTEX PROGRAM + MAP REDUCE + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get(); + + /// MAP REDUCE ONLY + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows").has("weight", P.gt(0.5f))).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.bothE().limit(0)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(__.outE().limit(1)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get(); + graphProvider.getGraphComputer(graph).edges(outE()).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get(); + + // EXCEPTION HANDLING + try { + graphProvider.getGraphComputer(graph).vertices(__.out()); + fail(); + } catch (final IllegalArgumentException e) { + assertEquals(e.getMessage(), GraphComputer.Exceptions.vertexFilterAccessesIncidentEdges(__.out()).getMessage()); + } + try { + graphProvider.getGraphComputer(graph).edges(__.out().outE()); + fail(); + } catch (final IllegalArgumentException e) { + assertEquals(e.getMessage(), GraphComputer.Exceptions.edgeFilterAccessesAdjacentVertices(__.out().outE()).getMessage()); + } + } + + public static class VertexProgramM implements VertexProgram { + + public static final String SOFTWARE_ONLY = "softwareOnly"; + public static final String PEOPLE_ONLY = "peopleOnly"; + public static final String KNOWS_ONLY = "knowsOnly"; + public static final String PEOPLE_KNOWS_ONLY = "peopleKnowsOnly"; + public static final String PEOPLE_KNOWS_WELL_ONLY = "peopleKnowsWellOnly"; + public static final String VERTICES_ONLY = "verticesOnly"; + public static final String ONE_OUT_EDGE_ONLY = "oneOutEdgeOnly"; + public static final String OUT_EDGES_ONLY = "outEdgesOnly"; + + private String state; + + public VertexProgramM() { + + } + + public VertexProgramM(final String state) { + this.state = state; + } + + @Override + public void setup(final Memory memory) { + + } + + @Override + public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) { + switch (this.state) { + case SOFTWARE_ONLY: { + assertEquals("software", vertex.label()); + assertFalse(vertex.edges(Direction.OUT).hasNext()); + assertTrue(vertex.edges(Direction.IN).hasNext()); + assertTrue(vertex.edges(Direction.IN, "created").hasNext()); + assertFalse(vertex.edges(Direction.IN, "knows").hasNext()); + break; + } + case PEOPLE_ONLY: { + assertEquals("person", vertex.label()); + assertFalse(vertex.edges(Direction.IN, "created").hasNext()); + assertTrue(IteratorUtils.count(vertex.edges(Direction.BOTH)) > 0); + break; + } + case KNOWS_ONLY: { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created"))); + if (vertex.value("name").equals("marko")) + assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + else if (vertex.value("name").equals("vadas")) + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else if (vertex.value("name").equals("josh")) + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + case PEOPLE_KNOWS_ONLY: { + assertEquals("person", vertex.label()); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created"))); + if (vertex.value("name").equals("marko")) + assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + else if (vertex.value("name").equals("vadas")) + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else if (vertex.value("name").equals("josh")) + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + case PEOPLE_KNOWS_WELL_ONLY: { + assertEquals("person", vertex.label()); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created"))); + if (vertex.value("name").equals("marko")) { + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + assertEquals(1.0, vertex.edges(Direction.OUT, "knows").next().value("weight"), 0.001); + } else if (vertex.value("name").equals("vadas")) + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + else if (vertex.value("name").equals("josh")) { + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows"))); + assertEquals(1.0, vertex.edges(Direction.IN, "knows").next().value("weight"), 0.001); + } else { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows"))); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + case VERTICES_ONLY: { + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + break; + } + case ONE_OUT_EDGE_ONLY: { + if (vertex.label().equals("software") || vertex.value("name").equals("vadas")) + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + else { + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.OUT))); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN))); + assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + case OUT_EDGES_ONLY: { + if (vertex.label().equals("software") || vertex.value("name").equals("vadas")) + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH))); + else { + assertTrue(IteratorUtils.count(vertex.edges(Direction.OUT)) > 0); + assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN))); + assertEquals(IteratorUtils.count(vertex.edges(Direction.OUT)), IteratorUtils.count(vertex.edges(Direction.BOTH))); + } + break; + } + default: + throw new IllegalStateException("This is an illegal state for this test case: " + this.state); + } + } + + @Override + public boolean terminate(final Memory memory) { + return true; + } + + @Override + public Set getMessageScopes(Memory memory) { + return Collections.emptySet(); + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.NOTHING; + } + + @Override + @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException") + public VertexProgramM clone() { + return new VertexProgramM(this.state); + } + + @Override + public void loadState(final Graph graph, final Configuration configuration) { + this.state = configuration.getString("state"); + } + + @Override + public void storeState(final Configuration configuration) { + configuration.setProperty("state", this.state); + VertexProgram.super.storeState(configuration); + } + + } + + private static class MapReduceJ implements MapReduce { + + private String state; + + public MapReduceJ() { + } + + public MapReduceJ(final String state) { + this.state = state; + } + + @Override + public void loadState(final Graph graph, final Configuration configuration) { + this.state = configuration.getString("state"); + } + + @Override + public void storeState(final Configuration configuration) { + configuration.setProperty("state", this.state); + MapReduce.super.storeState(configuration); + } + + @Override + @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException") + public MapReduceJ clone() { + return new MapReduceJ(this.state); + } + + @Override + public boolean doStage(final Stage stage) { + return true; + } + + @Override + public void map(final Vertex vertex, final MapEmitter emitter) { + emitter.emit(1); + switch (this.state) { + case VertexProgramM.SOFTWARE_ONLY: { + assertEquals("software", vertex.label()); + break; + } + case VertexProgramM.PEOPLE_ONLY: { + assertEquals("person", vertex.label()); + break; + } + case VertexProgramM.KNOWS_ONLY: { + assertTrue(vertex.label().equals("person") || vertex.label().equals("software")); + break; + } + case VertexProgramM.PEOPLE_KNOWS_ONLY: { + assertEquals("person", vertex.label()); + break; + } + case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: { + assertEquals("person", vertex.label()); + break; + } + case VertexProgramM.VERTICES_ONLY: { + assertTrue(vertex.label().equals("person") || vertex.label().equals("software")); + break; + } + case VertexProgramM.ONE_OUT_EDGE_ONLY: { + assertTrue(vertex.label().equals("person") || vertex.label().equals("software")); + break; + } + case VertexProgramM.OUT_EDGES_ONLY: { + assertTrue(vertex.label().equals("person") || vertex.label().equals("software")); + break; + } + default: + throw new IllegalStateException("This is an illegal state for this test case: " + this.state); + } + } + + @Override + public void combine(final NullObject key, final Iterator values, final ReduceEmitter emitter) { + this.reduce(key, values, emitter); + } + + @Override + public void reduce(final NullObject key, final Iterator values, final ReduceEmitter emitter) { + int count = 0; + while (values.hasNext()) { + count = count + values.next(); + } + emitter.emit(count); + } + + @Override + public Integer generateFinalResult(final Iterator> keyValues) { + int counter = keyValues.next().getValue(); + assertFalse(keyValues.hasNext()); + + switch (this.state) { + case VertexProgramM.SOFTWARE_ONLY: { + assertEquals(2, counter); + break; + } + case VertexProgramM.PEOPLE_ONLY: { + assertEquals(4, counter); + break; + } + case VertexProgramM.KNOWS_ONLY: { + assertEquals(6, counter); + break; + } + case VertexProgramM.PEOPLE_KNOWS_ONLY: { + assertEquals(4, counter); + break; + } + case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: { + assertEquals(4, counter); + break; + } + case VertexProgramM.VERTICES_ONLY: { + assertEquals(6, counter); + break; + } + case VertexProgramM.ONE_OUT_EDGE_ONLY: { + assertEquals(6, counter); + break; + } + case VertexProgramM.OUT_EDGES_ONLY: { + assertEquals(6, counter); + break; + } + default: + throw new IllegalStateException("This is an illegal state for this test case: " + this.state); + } + return counter; + } + + @Override + public String getMemoryKey() { + return "a"; + } + } + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportJobChaining() throws Exception { + final ComputerResult result1 = graphProvider.getGraphComputer(graph) + .program(PageRankVertexProgram.build().iterations(5).create(graph)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get(); + final Graph graph1 = result1.graph(); + final Memory memory1 = result1.memory(); + assertEquals(5, memory1.getIteration()); + assertEquals(6, graph1.traversal().V().count().next().intValue()); + assertEquals(6, graph1.traversal().E().count().next().intValue()); + assertEquals(6, graph1.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue()); + assertEquals(18, graph1.traversal().V().values().count().next().intValue()); + // + final ComputerResult result2 = graph1.compute(graphProvider.getGraphComputer(graph1).getClass()) + .program(PeerPressureVertexProgram.build().maxIterations(4).create(graph1)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get(); + final Graph graph2 = result2.graph(); + final Memory memory2 = result2.memory(); + assertTrue(memory2.getIteration() <= 4); + assertEquals(6, graph2.traversal().V().count().next().intValue()); + assertEquals(6, graph2.traversal().E().count().next().intValue()); + assertEquals(6, graph2.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue()); + assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue()); + assertEquals(24, graph2.traversal().V().values().count().next().intValue()); + // + final ComputerResult result3 = graph2.compute(graphProvider.getGraphComputer(graph2).getClass()) + .program(TraversalVertexProgram.build().traversal(g.V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get(); + final Graph graph3 = result3.graph(); + final Memory memory3 = result3.memory(); + assertTrue(memory3.keys().contains("m")); + assertTrue(memory3.keys().contains(TraversalVertexProgram.HALTED_TRAVERSERS)); + assertEquals(1, memory3.>get("m").size()); + assertEquals(6, memory3.>get("m").get(1l).intValue()); + List> traversers = IteratorUtils.list(memory3.get(TraversalVertexProgram.HALTED_TRAVERSERS).iterator()); + assertEquals(6l, traversers.stream().map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue()); + assertEquals(4l, traversers.stream().filter(s -> s.get().equals("person")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue()); + assertEquals(2l, traversers.stream().filter(s -> s.get().equals("software")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue()); + assertEquals(6, graph3.traversal().V().count().next().intValue()); + assertEquals(6, graph3.traversal().E().count().next().intValue()); + assertEquals(0, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue()); + assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue()); + assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue()); + assertEquals(24, graph3.traversal().V().values().count().next().intValue()); // no halted traversers + + // TODO: add a test the shows DAG behavior -- splitting another TraversalVertexProgram off of the PeerPressureVertexProgram job. + } + + /////////////////////////////////// + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportPreExistingComputeKeys() throws Exception { + final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramN()).submit().get(); + result.graph().vertices().forEachRemaining(vertex -> { + if (vertex.label().equals("person")) { + if (vertex.value("name").equals("marko")) + assertEquals(32, vertex.value("age").intValue()); + else if (vertex.value("name").equals("peter")) + assertEquals(38, vertex.value("age").intValue()); + else if (vertex.value("name").equals("vadas")) + assertEquals(30, vertex.value("age").intValue()); + else if (vertex.value("name").equals("josh")) + assertEquals(35, vertex.value("age").intValue()); + else + throw new IllegalStateException("This vertex should not have been accessed: " + vertex); + } + }); + } + + private static class VertexProgramN extends StaticVertexProgram { + @Override + public void setup(final Memory memory) { + + } + + @Override + public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) { + if (vertex.label().equals("person")) + vertex.property(VertexProperty.Cardinality.single, "age", vertex.value("age") + 1); + } + + @Override + public boolean terminate(final Memory memory) { + return memory.getIteration() > 1; + } + + @Override + public Set getMessageScopes(final Memory memory) { + return Collections.emptySet(); + } + + @Override + public Set getVertexComputeKeys() { + return Collections.singleton(VertexComputeKey.of("age", false)); + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.VERTEX_PROPERTIES; + } + } + + /////////////////////////////////// + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportTransientKeys() throws Exception { + final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramO()).mapReduce(new MapReduceK()).submit().get(); + result.graph().vertices().forEachRemaining(vertex -> { + assertFalse(vertex.property("v1").isPresent()); + assertFalse(vertex.property("v2").isPresent()); + assertTrue(vertex.property("v3").isPresent()); + assertEquals("shouldExist", vertex.value("v3")); + assertTrue(vertex.property("name").isPresent()); + if (vertex.label().equals("software")) + assertTrue(vertex.property("lang").isPresent()); + else + assertTrue(vertex.property("age").isPresent()); + assertEquals(3, IteratorUtils.count(vertex.properties())); + assertEquals(0, IteratorUtils.count(vertex.properties("v1"))); + assertEquals(0, IteratorUtils.count(vertex.properties("v2"))); + assertEquals(1, IteratorUtils.count(vertex.properties("v3"))); + assertEquals(1, IteratorUtils.count(vertex.properties("name"))); + }); + assertEquals(6l, result.graph().traversal().V().properties("name").count().next().longValue()); + assertEquals(0l, result.graph().traversal().V().properties("v1").count().next().longValue()); + assertEquals(0l, result.graph().traversal().V().properties("v2").count().next().longValue()); + assertEquals(6l, result.graph().traversal().V().properties("v3").count().next().longValue()); + assertEquals(6l, result.graph().traversal().V().values("name").dedup().count().next().longValue()); + assertEquals(1l, result.graph().traversal().V().values("v3").dedup().count().next().longValue()); + assertEquals("shouldExist", result.graph().traversal().V().values("v3").dedup().next()); + /// + assertFalse(result.memory().exists("m1")); + assertFalse(result.memory().exists("m2")); + assertTrue(result.memory().exists("m3")); + assertEquals(24l, result.memory().get("m3").longValue()); + assertEquals(2, result.memory().keys().size()); // mapReduceK + } + + private static class VertexProgramO extends StaticVertexProgram { + + @Override + public void setup(final Memory memory) { + assertFalse(memory.exists("m1")); + assertFalse(memory.exists("m2")); + assertFalse(memory.exists("m3")); + } + + @Override + public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) { + if (memory.isInitialIteration()) { + assertFalse(vertex.property("v1").isPresent()); + assertFalse(vertex.property("v2").isPresent()); + assertFalse(vertex.property("v3").isPresent()); + vertex.property("v1", "shouldNotExist"); + vertex.property("v2", "shouldNotExist"); + vertex.property("v3", "shouldExist"); + assertTrue(vertex.property("v1").isPresent()); + assertTrue(vertex.property("v2").isPresent()); + assertTrue(vertex.property("v3").isPresent()); + assertEquals("shouldNotExist", vertex.value("v1")); + assertEquals("shouldNotExist", vertex.value("v2")); + assertEquals("shouldExist", vertex.value("v3")); + // + assertFalse(memory.exists("m1")); + assertFalse(memory.exists("m2")); + assertFalse(memory.exists("m3")); + memory.add("m1", false); + memory.add("m2", true); + memory.add("m3", 2l); + // should still not exist as this pulls from the master memory + assertFalse(memory.exists("m1")); + assertFalse(memory.exists("m2")); + assertFalse(memory.exists("m3")); + + } else { + assertTrue(vertex.property("v1").isPresent()); + assertTrue(vertex.property("v2").isPresent()); + assertTrue(vertex.property("v3").isPresent()); + assertEquals("shouldNotExist", vertex.value("v1")); + assertEquals("shouldNotExist", vertex.value("v2")); + assertEquals("shouldExist", vertex.value("v3")); + // + assertTrue(memory.exists("m1")); + assertTrue(memory.exists("m2")); + assertTrue(memory.exists("m3")); + assertFalse(memory.get("m1")); + assertTrue(memory.get("m2")); + assertEquals(12l, memory.get("m3").longValue()); + memory.add("m1", true); + memory.add("m2", true); + memory.add("m3", 2l); + } + } + + @Override + public boolean terminate(final Memory memory) { + assertTrue(memory.exists("m1")); + assertTrue(memory.exists("m2")); + assertTrue(memory.exists("m3")); + if (memory.isInitialIteration()) { + assertFalse(memory.get("m1")); + assertTrue(memory.get("m2")); + assertEquals(12l, memory.get("m3").longValue()); + return false; + } else { + assertTrue(memory.get("m1")); + assertTrue(memory.get("m2")); + assertEquals(24l, memory.get("m3").longValue()); + return true; + } + } + + @Override + public Set getMessageScopes(final Memory memory) { + return Collections.emptySet(); + } + + @Override + public Set getMemoryComputeKeys() { + return new HashSet<>(Arrays.asList( + MemoryComputeKey.of("m1", Operator.or, true, true), + MemoryComputeKey.of("m2", Operator.and, true, true), + MemoryComputeKey.of("m3", Operator.sum, true, false))); + } + + @Override + public Set getVertexComputeKeys() { + return new HashSet<>(Arrays.asList( + VertexComputeKey.of("v1", true), + VertexComputeKey.of("v2", true), + VertexComputeKey.of("v3", false))); + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.VERTEX_PROPERTIES; + } + } + + public static class MapReduceK extends StaticMapReduce { + + @Override + public boolean doStage(final Stage stage) { + return stage.equals(Stage.MAP); + } + + @Override + public void map(final Vertex vertex, final MapEmitter emitter) { + assertFalse(vertex.property("v1").isPresent()); + assertFalse(vertex.property("v2").isPresent()); + assertTrue(vertex.property("v3").isPresent()); + assertTrue(vertex.property("name").isPresent()); + assertEquals(3, IteratorUtils.count(vertex.properties())); + assertEquals(3, IteratorUtils.count(vertex.values())); + } + + @Override + public String getMemoryKey() { + return "mapReduceK"; + } + + @Override + public Object generateFinalResult(final Iterator keyValues) { + return "anObject"; + } + } + + /////////////////////////////////// + + @Test + @LoadGraphWith(MODERN) + public void shouldSupportBroadcastKeys() throws Exception { + final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramP()).submit().get(); + assertTrue(result.memory().exists("m1")); + assertFalse(result.memory().exists("m2")); + assertFalse(result.memory().exists("m3")); + assertTrue(result.memory().exists("m4")); + assertTrue(result.memory().get("m1")); + assertEquals(-18, result.memory().get("m4").intValue()); + assertEquals(2, result.memory().keys().size()); + } + + private static class VertexProgramP extends StaticVertexProgram { + + @Override + public void setup(final Memory memory) { + assertFalse(memory.exists("m1")); // or + assertFalse(memory.exists("m2")); // and + assertFalse(memory.exists("m3")); // long + assertFalse(memory.exists("m4")); // int + memory.set("m1", false); + memory.set("m2", true); + memory.set("m3", 0l); + memory.set("m4", 0); + } + + @Override + public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) { + if (memory.isInitialIteration()) { + assertFalse(memory.exists("m1")); + assertTrue(memory.exists("m2")); + assertTrue(memory.get("m2")); + assertFalse(memory.exists("m3")); + assertTrue(memory.exists("m4")); + assertEquals(0, memory.get("m4").intValue()); + memory.add("m1", false); + memory.add("m2", true); + memory.add("m3", 1l); + memory.add("m4", -1); + } else { + assertFalse(memory.exists("m1")); // no broadcast + assertTrue(memory.exists("m2")); + assertFalse(memory.exists("m3")); // no broadcast + assertTrue(memory.exists("m4")); + try { + assertFalse(memory.get("m1")); + fail(); + } catch (final Exception e) { + validateException(Memory.Exceptions.memoryDoesNotExist("m1"), e); + } + assertTrue(memory.get("m2")); + try { + assertEquals(6l, memory.get("m3").longValue()); + fail(); + } catch (final Exception e) { + validateException(Memory.Exceptions.memoryDoesNotExist("m3"), e); + } + assertEquals(-6l, memory.get("m4").intValue()); + /// + memory.add("m1", true); + memory.add("m2", true); + memory.add("m3", 2l); + memory.add("m4", -2); + } + } + + @Override + public boolean terminate(final Memory memory) { + assertTrue(memory.exists("m1")); + assertTrue(memory.exists("m2")); + assertTrue(memory.exists("m3")); + assertTrue(memory.exists("m4")); + if (memory.isInitialIteration()) { + assertFalse(memory.get("m1")); + assertTrue(memory.get("m2")); + assertEquals(6l, memory.get("m3").longValue()); + assertEquals(-6, memory.get("m4").intValue()); + return false; + } else { + assertTrue(memory.get("m1")); + assertTrue(memory.get("m2")); + assertEquals(18l, memory.get("m3").longValue()); + assertEquals(-18, memory.get("m4").intValue()); + return true; + } + } + + @Override + public Set getMessageScopes(final Memory memory) { + return Collections.emptySet(); + } + + @Override + public Set getMemoryComputeKeys() { + return new HashSet<>(Arrays.asList( + MemoryComputeKey.of("m1", Operator.or, false, false), + MemoryComputeKey.of("m2", Operator.and, true, true), + MemoryComputeKey.of("m3", Operator.sum, false, true), + MemoryComputeKey.of("m4", Operator.sum, true, false))); + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.VERTEX_PROPERTIES; + } + } + + /////////////////////////////////// + + @Test + @LoadGraphWith(MODERN) + public void shouldSucceedWithProperTraverserRequirements() throws Exception { + + final VertexProgramQ vp = VertexProgramQ.build().property("pl").create(); + final Map> expected = new HashMap<>(); + expected.put("vadas", Collections.singletonList(2)); + expected.put("lop", Arrays.asList(2, 2, 2, 3)); + expected.put("josh", Collections.singletonList(2)); + expected.put("ripple", Arrays.asList(2, 3)); + + try { + g.V().repeat(__.out()).emit().program(vp).dedup() + .valueMap("name", "pl").forEachRemaining((Map map) -> { + + final String name = (String) ((List) map.get("name")).get(0); + final List pathLengths = (List) map.get("pl"); + assertTrue(expected.containsKey(name)); + final List expectedPathLengths = expected.remove(name); + assertTrue(expectedPathLengths.containsAll(pathLengths)); + assertTrue(pathLengths.containsAll(expectedPathLengths)); + }); + + assertTrue(expected.isEmpty()); + } catch (VerificationException ex) { + assumeNoException(ex); + } + } + + @Test + @LoadGraphWith(MODERN) + public void shouldFailWithImproperTraverserRequirements() throws Exception { + final VertexProgramQ vp = VertexProgramQ.build().property("pl").useTraverserRequirements(false).create(); + try { + g.V().repeat(__.out()).emit().program(vp).dedup() + .forEachRemaining((Vertex v) -> assertFalse(v.property("pl").isPresent())); + } catch (VerificationException ex) { + assumeNoException(ex); + } + } + + private static class VertexProgramQ implements VertexProgram { + + private static final String VERTEX_PROGRAM_Q_CFG_PREFIX = "gremlin.vertexProgramQ"; + private static final String PROPERTY_CFG_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".property"; + private static final String LENGTHS_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".lengths"; + private static final String USE_TRAVERSER_REQUIREMENTS_CFG_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".useTraverserRequirements"; + + private final static Set MEMORY_COMPUTE_KEYS = Collections.singleton( + MemoryComputeKey.of(LENGTHS_KEY, Operator.addAll, true, true) + ); + + private final Set elementComputeKeys; + private Configuration configuration; + private String propertyKey; + private Set traverserRequirements; + + private VertexProgramQ() { + elementComputeKeys = new HashSet<>(); + } + + public static Builder build() { + return new Builder(); + } + + static class Builder extends AbstractVertexProgramBuilder { + + private Builder() { + super(VertexProgramQ.class); + } + + @SuppressWarnings("unchecked") + @Override + public VertexProgramQ create(final Graph graph) { + if (graph != null) { + ConfigurationUtils.append(graph.configuration().subset(VERTEX_PROGRAM_Q_CFG_PREFIX), configuration); + } + return (VertexProgramQ) VertexProgram.createVertexProgram(graph, configuration); + } + + public VertexProgramQ create() { + return create(null); + } + + public Builder property(final String name) { + configuration.setProperty(PROPERTY_CFG_KEY, name); + return this; + } + + /** + * This is only configurable for the purpose of testing. In a real-world VP this would be a bad pattern. + */ + public Builder useTraverserRequirements(final boolean value) { + configuration.setProperty(USE_TRAVERSER_REQUIREMENTS_CFG_KEY, value); + return this; + } + } + + @Override + public void storeState(final Configuration config) { + VertexProgram.super.storeState(config); + if (configuration != null) { + ConfigurationUtils.copy(configuration, config); + } + } + + + @Override + public void loadState(final Graph graph, final Configuration config) { + configuration = new BaseConfiguration(); + if (config != null) { + ConfigurationUtils.copy(config, configuration); + } + propertyKey = configuration.getString(PROPERTY_CFG_KEY); + traverserRequirements = configuration.getBoolean(USE_TRAVERSER_REQUIREMENTS_CFG_KEY, true) + ? Collections.singleton(TraverserRequirement.PATH) : Collections.emptySet(); + elementComputeKeys.add(VertexComputeKey.of(propertyKey, false)); + } + + @Override + public void setup(final Memory memory) { + } + + @Override + public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) { + if (memory.isInitialIteration()) { + final Property haltedTraversers = vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS); + if (!haltedTraversers.isPresent()) return; + final Iterator iterator = haltedTraversers.value().iterator(); + if (iterator.hasNext()) { + while (iterator.hasNext()) { + final Traverser t = (Traverser) iterator.next(); + if (!(t.path() instanceof EmptyPath)) { + final int pathLength = t.path().size(); + final List> memoryValue = new LinkedList<>(); + memoryValue.add(Pair.with(vertex, pathLength)); + memory.add(LENGTHS_KEY, memoryValue); + } + } + } + } else { + if (memory.exists(LENGTHS_KEY)) { + final List> lengths = memory.get(LENGTHS_KEY); + for (final Pair pair : lengths) { + if (pair.getValue0().equals(vertex)) { + vertex.property(VertexProperty.Cardinality.list, propertyKey, pair.getValue1()); + } + } + } + } + } + + @Override + public boolean terminate(final Memory memory) { + return !memory.isInitialIteration(); + } + + @Override + public Set getMessageScopes(final Memory memory) { + return Collections.emptySet(); + } + + @Override + public Set getVertexComputeKeys() { + return elementComputeKeys; + } + + @Override + public Set getMemoryComputeKeys() { + return MEMORY_COMPUTE_KEYS; + } + + @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException", "CloneDoesntCallSuperClone"}) + @Override + public VertexProgram clone() { + return this; + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.VERTEX_PROPERTIES; + } + + @Override + public Set getTraverserRequirements() { + return this.traverserRequirements; + } + + @Override + public Features getFeatures() { + return new Features() { + @Override + public boolean requiresVertexPropertyAddition() { + return true; + } + }; + } - - } }