tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From spmalle...@apache.org
Subject [27/46] tinkerpop git commit: fixed up the MultiScope test to use MODERN so that it works with Giraph and Spark. Will back tweak to tp31/.
Date Wed, 28 Jun 2017 19:14:58 GMT
fixed up the MultiScope test to use MODERN so that it works with Giraph and Spark. Will back
tweak to tp31/.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f1aed80b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f1aed80b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f1aed80b

Branch: refs/heads/TINKERPOP-1489
Commit: f1aed80b056c2244ce8b23ab077e8b10ee6939d9
Parents: 5ac61b7 3b8c628
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Tue Jun 27 09:50:15 2017 -0600
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Tue Jun 27 09:50:15 2017 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  2 +
 .../process/computer/GraphComputerTest.java     | 75 ++++++++++++++++++--
 .../process/computer/TinkerMessageBoard.java    |  4 +-
 .../process/computer/TinkerMessenger.java       | 22 +++---
 4 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/CHANGELOG.asciidoc
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --cc gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 5c66673,02ac5d4..e4b40e8
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@@ -24,35 -22,18 +24,32 @@@ import org.apache.commons.configuration
  import org.apache.tinkerpop.gremlin.ExceptionCoverage;
  import org.apache.tinkerpop.gremlin.LoadGraphWith;
  import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
 +import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
 +import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
 +import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 +import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
  import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
  import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
 +import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 +import org.apache.tinkerpop.gremlin.process.traversal.P;
- import org.apache.tinkerpop.gremlin.process.traversal.Path;
 +import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 +import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
  import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyPath;
 +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
 +import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
  import org.apache.tinkerpop.gremlin.structure.Direction;
 -import org.apache.tinkerpop.gremlin.structure.Element;
 +import org.apache.tinkerpop.gremlin.structure.Edge;
  import org.apache.tinkerpop.gremlin.structure.Graph;
 +import org.apache.tinkerpop.gremlin.structure.Property;
  import org.apache.tinkerpop.gremlin.structure.Vertex;
  import org.apache.tinkerpop.gremlin.structure.VertexProperty;
  import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
  import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 +import org.javatuples.Pair;
- import org.junit.Ignore;
  import org.junit.Test;
  
- import java.util.AbstractMap;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collections;
@@@ -70,15 -50,11 +68,14 @@@ import java.util.concurrent.Future
  
  import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
  import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
- import static org.apache.tinkerpop.gremlin.structure.T.id;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
  import static org.junit.Assert.fail;
 +import static org.junit.Assume.assumeNoException;
 +
+ 
  /**
   * @author Marko A. Rodriguez (http://markorodriguez.com)
   */
@@@ -1588,567 -1468,92 +1585,635 @@@ public class GraphComputerTest extends 
          public void storeState(final Configuration configuration) {
              VertexProgram.super.storeState(configuration);
          }
+     }
+ 
+     /////////////////////////////////////////////
+     @Test
++    @LoadGraphWith(MODERN)
+     public void shouldSupportMultipleScopes() throws ExecutionException, InterruptedException
{
 -        Vertex a = graph.addVertex("a");
 -        Vertex b = graph.addVertex("b");
 -        Vertex c = graph.addVertex("c");
 -        a.addEdge("edge", b);
 -        b.addEdge("edge", c);
 -
 -        // Simple graph:
 -        // a -> b -> c
 -
 -        // Execute a traversal program that sends an incoming message of "2" and an outgoing
message of "1" from "b"
 -        // then each vertex sums any received messages
 -        ComputerResult result = graph.compute().program(new MultiScopeVertexProgram()).submit().get();
 -
 -        // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3}
 -        assertEquals((Long) result.graph().traversal().V().hasLabel("a").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(2L));
 -        assertEquals((Long) result.graph().traversal().V().hasLabel("b").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(0L));
 -        assertEquals((Long) result.graph().traversal().V().hasLabel("c").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),Long.valueOf(1L));
++        final ComputerResult result = graph.compute().program(new MultiScopeVertexProgram()).submit().get();
++        assertEquals(result.graph().traversal().V().has("name", "josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),
0L);
++        assertEquals(result.graph().traversal().V().has("name", "lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),
1L);
++        assertEquals(result.graph().traversal().V().has("name", "ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),
1L);
++        assertEquals(result.graph().traversal().V().has("name", "marko").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(),
2L);
+     }
+ 
 -    public static class MultiScopeVertexProgram implements VertexProgram<Long> {
++    public static class MultiScopeVertexProgram extends StaticVertexProgram<Long>
{
+ 
+         private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
+         private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);
+ 
+         private static final String MEMORY_KEY = "count";
+ 
 -        private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY);
+ 
+         @Override
 -        public void setup(final Memory memory) {}
++        public void setup(final Memory memory) {
++        }
+ 
+         @Override
+         public GraphComputer.Persist getPreferredPersist() {
+             return GraphComputer.Persist.VERTEX_PROPERTIES;
+         }
+ 
+         @Override
 -        public Set<String> getElementComputeKeys() {
 -            return COMPUTE_KEYS;
++        public Set<VertexComputeKey> getVertexComputeKeys() {
++            return Collections.singleton(VertexComputeKey.of(MEMORY_KEY, false));
+         }
+ 
+         @Override
+         public Set<MessageScope> getMessageScopes(final Memory memory) {
+             HashSet<MessageScope> scopes = new HashSet<>();
+             scopes.add(countMessageScopeIn);
+             scopes.add(countMessageScopeOut);
+             return scopes;
+         }
+ 
+         @Override
+         public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory)
{
+             switch (memory.getIteration()) {
+                 case 0:
 -                    if (vertex.label().equals("b")) {
++                    if (vertex.value("name").equals("josh")) {
+                         messenger.sendMessage(this.countMessageScopeIn, 2L);
+                         messenger.sendMessage(this.countMessageScopeOut, 1L);
+                     }
+                     break;
+                 case 1:
+                     long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L,
(a, b) -> a + b);
+                     vertex.property(MEMORY_KEY, edgeCount);
+                     break;
+             }
+         }
+ 
+         @Override
+         public boolean terminate(final Memory memory) {
+             return memory.getIteration() == 1;
+         }
+ 
+         @Override
+         public GraphComputer.ResultGraph getPreferredResultGraph() {
+             return GraphComputer.ResultGraph.NEW;
+         }
  
 -        @Override
 -        public MultiScopeVertexProgram clone() {
 +    }
 +
 +    /////////////////////////////////////////////
 +
 +    @Test
 +    @LoadGraphWith(MODERN)
 +    public void shouldSupportGraphFilter() throws Exception {
 +        // if the graph computer does not support graph filter, then make sure its exception
handling is correct
 +        if (!graphProvider.getGraphComputer(graph).features().supportsGraphFilter()) {
              try {
 -                return (MultiScopeVertexProgram) super.clone();
 -            } catch (final CloneNotSupportedException e) {
 -                throw new RuntimeException(e);
 +                graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software"));
 +                fail("Should throw an unsupported operation exception");
 +            } catch (final UnsupportedOperationException e) {
 +                assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(),
e.getMessage());
 +            }
 +            try {
 +                graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(10));
 +                fail("Should throw an unsupported operation exception");
 +            } catch (final UnsupportedOperationException e) {
 +                assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(),
e.getMessage());
 +            }
 +            return;
 +        }
 +        /// VERTEX PROGRAM
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new
VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new
VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new
VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight",
P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new
VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new
VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
 +
 +        /// VERTEX PROGRAM + MAP REDUCE
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new
VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new
VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).mapReduce(new
MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new
VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight",
P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).mapReduce(new
MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new
VertexProgramM(VertexProgramM.VERTICES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new
VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).mapReduce(new
MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
 +
 +        /// MAP REDUCE ONLY
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).mapReduce(new
MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).mapReduce(new
MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).mapReduce(new
MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight",
P.gt(0.5f))).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).mapReduce(new
MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).mapReduce(new
MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
 +        graphProvider.getGraphComputer(graph).edges(outE()).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
 +
 +        // EXCEPTION HANDLING
 +        try {
 +            graphProvider.getGraphComputer(graph).vertices(__.out());
 +            fail();
 +        } catch (final IllegalArgumentException e) {
 +            assertEquals(e.getMessage(), GraphComputer.Exceptions.vertexFilterAccessesIncidentEdges(__.out()).getMessage());
 +        }
 +        try {
 +            graphProvider.getGraphComputer(graph).edges(__.<Vertex>out().outE());
 +            fail();
 +        } catch (final IllegalArgumentException e) {
 +            assertEquals(e.getMessage(), GraphComputer.Exceptions.edgeFilterAccessesAdjacentVertices(__.<Vertex>out().outE()).getMessage());
 +        }
 +    }
 +
 +    public static class VertexProgramM implements VertexProgram {
 +
 +        public static final String SOFTWARE_ONLY = "softwareOnly";
 +        public static final String PEOPLE_ONLY = "peopleOnly";
 +        public static final String KNOWS_ONLY = "knowsOnly";
 +        public static final String PEOPLE_KNOWS_ONLY = "peopleKnowsOnly";
 +        public static final String PEOPLE_KNOWS_WELL_ONLY = "peopleKnowsWellOnly";
 +        public static final String VERTICES_ONLY = "verticesOnly";
 +        public static final String ONE_OUT_EDGE_ONLY = "oneOutEdgeOnly";
 +        public static final String OUT_EDGES_ONLY = "outEdgesOnly";
 +
 +        private String state;
 +
 +        public VertexProgramM() {
 +
 +        }
 +
 +        public VertexProgramM(final String state) {
 +            this.state = state;
 +        }
 +
 +        @Override
 +        public void setup(final Memory memory) {
 +
 +        }
 +
 +        @Override
 +        public void execute(final Vertex vertex, final Messenger messenger, final Memory
memory) {
 +            switch (this.state) {
 +                case SOFTWARE_ONLY: {
 +                    assertEquals("software", vertex.label());
 +                    assertFalse(vertex.edges(Direction.OUT).hasNext());
 +                    assertTrue(vertex.edges(Direction.IN).hasNext());
 +                    assertTrue(vertex.edges(Direction.IN, "created").hasNext());
 +                    assertFalse(vertex.edges(Direction.IN, "knows").hasNext());
 +                    break;
 +                }
 +                case PEOPLE_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    assertFalse(vertex.edges(Direction.IN, "created").hasNext());
 +                    assertTrue(IteratorUtils.count(vertex.edges(Direction.BOTH)) > 0);
 +                    break;
 +                }
 +                case KNOWS_ONLY: {
 +                    assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
 +                    if (vertex.value("name").equals("marko"))
 +                        assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH,
"knows")));
 +                    else if (vertex.value("name").equals("vadas"))
 +                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else if (vertex.value("name").equals("josh"))
 +                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else {
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH,
"knows")));
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                case PEOPLE_KNOWS_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
 +                    if (vertex.value("name").equals("marko"))
 +                        assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH,
"knows")));
 +                    else if (vertex.value("name").equals("vadas"))
 +                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else if (vertex.value("name").equals("josh"))
 +                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else {
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH,
"knows")));
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                case PEOPLE_KNOWS_WELL_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
 +                    if (vertex.value("name").equals("marko")) {
 +                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH,
"knows")));
 +                        assertEquals(1.0, vertex.edges(Direction.OUT, "knows").next().value("weight"),
0.001);
 +                    } else if (vertex.value("name").equals("vadas"))
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                    else if (vertex.value("name").equals("josh")) {
 +                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
 +                        assertEquals(1.0, vertex.edges(Direction.IN, "knows").next().value("weight"),
0.001);
 +                    } else {
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH,
"knows")));
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                case VERTICES_ONLY: {
 +                    assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    break;
 +                }
 +                case ONE_OUT_EDGE_ONLY: {
 +                    if (vertex.label().equals("software") || vertex.value("name").equals("vadas"))
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    else {
 +                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.OUT)));
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN)));
 +                        assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                case OUT_EDGES_ONLY: {
 +                    if (vertex.label().equals("software") || vertex.value("name").equals("vadas"))
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    else {
 +                        assertTrue(IteratorUtils.count(vertex.edges(Direction.OUT)) >
0);
 +                        assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN)));
 +                        assertEquals(IteratorUtils.count(vertex.edges(Direction.OUT)), IteratorUtils.count(vertex.edges(Direction.BOTH)));
 +                    }
 +                    break;
 +                }
 +                default:
 +                    throw new IllegalStateException("This is an illegal state for this test
case: " + this.state);
 +            }
 +        }
 +
 +        @Override
 +        public boolean terminate(final Memory memory) {
 +            return true;
 +        }
 +
 +        @Override
 +        public Set<MessageScope> getMessageScopes(Memory memory) {
 +            return Collections.emptySet();
 +        }
 +
 +        @Override
 +        public GraphComputer.ResultGraph getPreferredResultGraph() {
 +            return GraphComputer.ResultGraph.NEW;
 +        }
 +
 +        @Override
 +        public GraphComputer.Persist getPreferredPersist() {
 +            return GraphComputer.Persist.NOTHING;
 +        }
 +
 +        @Override
 +        @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
 +        public VertexProgramM clone() {
 +            return new VertexProgramM(this.state);
 +        }
 +
 +        @Override
 +        public void loadState(final Graph graph, final Configuration configuration) {
 +            this.state = configuration.getString("state");
 +        }
 +
 +        @Override
 +        public void storeState(final Configuration configuration) {
 +            configuration.setProperty("state", this.state);
 +            VertexProgram.super.storeState(configuration);
 +        }
 +
 +    }
 +
 +    private static class MapReduceJ implements MapReduce<MapReduce.NullObject, Integer,
MapReduce.NullObject, Integer, Integer> {
 +
 +        private String state;
 +
 +        public MapReduceJ() {
 +        }
 +
 +        public MapReduceJ(final String state) {
 +            this.state = state;
 +        }
 +
 +        @Override
 +        public void loadState(final Graph graph, final Configuration configuration) {
 +            this.state = configuration.getString("state");
 +        }
 +
 +        @Override
 +        public void storeState(final Configuration configuration) {
 +            configuration.setProperty("state", this.state);
 +            MapReduce.super.storeState(configuration);
 +        }
 +
 +        @Override
 +        @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
 +        public MapReduceJ clone() {
 +            return new MapReduceJ(this.state);
 +        }
 +
 +        @Override
 +        public boolean doStage(final Stage stage) {
 +            return true;
 +        }
 +
 +        @Override
 +        public void map(final Vertex vertex, final MapEmitter<NullObject, Integer>
emitter) {
 +            emitter.emit(1);
 +            switch (this.state) {
 +                case VertexProgramM.SOFTWARE_ONLY: {
 +                    assertEquals("software", vertex.label());
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    break;
 +                }
 +                case VertexProgramM.KNOWS_ONLY: {
 +                    assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_KNOWS_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
 +                    assertEquals("person", vertex.label());
 +                    break;
 +                }
 +                case VertexProgramM.VERTICES_ONLY: {
 +                    assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
 +                    break;
 +                }
 +                case VertexProgramM.ONE_OUT_EDGE_ONLY: {
 +                    assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
 +                    break;
 +                }
 +                case VertexProgramM.OUT_EDGES_ONLY: {
 +                    assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
 +                    break;
 +                }
 +                default:
 +                    throw new IllegalStateException("This is an illegal state for this test
case: " + this.state);
 +            }
 +        }
 +
 +        @Override
 +        public void combine(final NullObject key, final Iterator<Integer> values,
final ReduceEmitter<NullObject, Integer> emitter) {
 +            this.reduce(key, values, emitter);
 +        }
 +
 +        @Override
 +        public void reduce(final NullObject key, final Iterator<Integer> values, final
ReduceEmitter<NullObject, Integer> emitter) {
 +            int count = 0;
 +            while (values.hasNext()) {
 +                count = count + values.next();
 +            }
 +            emitter.emit(count);
 +        }
 +
 +        @Override
 +        public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>>
keyValues) {
 +            int counter = keyValues.next().getValue();
 +            assertFalse(keyValues.hasNext());
 +
 +            switch (this.state) {
 +                case VertexProgramM.SOFTWARE_ONLY: {
 +                    assertEquals(2, counter);
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_ONLY: {
 +                    assertEquals(4, counter);
 +                    break;
 +                }
 +                case VertexProgramM.KNOWS_ONLY: {
 +                    assertEquals(6, counter);
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_KNOWS_ONLY: {
 +                    assertEquals(4, counter);
 +                    break;
 +                }
 +                case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
 +                    assertEquals(4, counter);
 +                    break;
 +                }
 +                case VertexProgramM.VERTICES_ONLY: {
 +                    assertEquals(6, counter);
 +                    break;
 +                }
 +                case VertexProgramM.ONE_OUT_EDGE_ONLY: {
 +                    assertEquals(6, counter);
 +                    break;
 +                }
 +                case VertexProgramM.OUT_EDGES_ONLY: {
 +                    assertEquals(6, counter);
 +                    break;
 +                }
 +                default:
 +                    throw new IllegalStateException("This is an illegal state for this test
case: " + this.state);
 +            }
 +            return counter;
 +        }
 +
 +        @Override
 +        public String getMemoryKey() {
 +            return "a";
 +        }
 +    }
 +
 +    @Test
 +    @LoadGraphWith(MODERN)
 +    public void shouldSupportJobChaining() throws Exception {
 +        final ComputerResult result1 = graphProvider.getGraphComputer(graph)
 +                .program(PageRankVertexProgram.build().iterations(5).create(graph)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
 +        final Graph graph1 = result1.graph();
 +        final Memory memory1 = result1.memory();
 +        assertEquals(5, memory1.getIteration());
 +        assertEquals(6, graph1.traversal().V().count().next().intValue());
 +        assertEquals(6, graph1.traversal().E().count().next().intValue());
 +        assertEquals(6, graph1.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
 +        assertEquals(18, graph1.traversal().V().values().count().next().intValue());
 +        //
 +        final ComputerResult result2 = graph1.compute(graphProvider.getGraphComputer(graph1).getClass())
 +                .program(PeerPressureVertexProgram.build().maxIterations(4).create(graph1)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
 +        final Graph graph2 = result2.graph();
 +        final Memory memory2 = result2.memory();
 +        assertTrue(memory2.getIteration() <= 4);
 +        assertEquals(6, graph2.traversal().V().count().next().intValue());
 +        assertEquals(6, graph2.traversal().E().count().next().intValue());
 +        assertEquals(6, graph2.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
 +        assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
 +        assertEquals(24, graph2.traversal().V().values().count().next().intValue());
 +        //
 +        final ComputerResult result3 = graph2.compute(graphProvider.getGraphComputer(graph2).getClass())
 +                .program(TraversalVertexProgram.build().traversal(g.V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
 +        final Graph graph3 = result3.graph();
 +        final Memory memory3 = result3.memory();
 +        assertTrue(memory3.keys().contains("m"));
 +        assertTrue(memory3.keys().contains(TraversalVertexProgram.HALTED_TRAVERSERS));
 +        assertEquals(1, memory3.<Map<Long, Long>>get("m").size());
 +        assertEquals(6, memory3.<Map<Long, Long>>get("m").get(1l).intValue());
 +        List<Traverser<String>> traversers = IteratorUtils.list(memory3.<TraverserSet>get(TraversalVertexProgram.HALTED_TRAVERSERS).iterator());
 +        assertEquals(6l, traversers.stream().map(Traverser::bulk).reduce((a, b) -> a
+ b).get().longValue());
 +        assertEquals(4l, traversers.stream().filter(s -> s.get().equals("person")).map(Traverser::bulk).reduce((a,
b) -> a + b).get().longValue());
 +        assertEquals(2l, traversers.stream().filter(s -> s.get().equals("software")).map(Traverser::bulk).reduce((a,
b) -> a + b).get().longValue());
 +        assertEquals(6, graph3.traversal().V().count().next().intValue());
 +        assertEquals(6, graph3.traversal().E().count().next().intValue());
 +        assertEquals(0, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
 +        assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
 +        assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
 +        assertEquals(24, graph3.traversal().V().values().count().next().intValue()); //
no halted traversers
 +
 +        // TODO: add a test the shows DAG behavior -- splitting another TraversalVertexProgram
off of the PeerPressureVertexProgram job.
 +    }
 +
 +    ///////////////////////////////////
 +
 +    @Test
 +    @LoadGraphWith(MODERN)
 +    public void shouldSupportPreExistingComputeKeys() throws Exception {
 +        final ComputerResult result = graphProvider.getGraphComputer(graph).program(new
VertexProgramN()).submit().get();
 +        result.graph().vertices().forEachRemaining(vertex -> {
 +            if (vertex.label().equals("person")) {
 +                if (vertex.value("name").equals("marko"))
 +                    assertEquals(32, vertex.<Integer>value("age").intValue());
 +                else if (vertex.value("name").equals("peter"))
 +                    assertEquals(38, vertex.<Integer>value("age").intValue());
 +                else if (vertex.value("name").equals("vadas"))
 +                    assertEquals(30, vertex.<Integer>value("age").intValue());
 +                else if (vertex.value("name").equals("josh"))
 +                    assertEquals(35, vertex.<Integer>value("age").intValue());
 +                else
 +                    throw new IllegalStateException("This vertex should not have been accessed:
" + vertex);
 +            }
 +        });
 +    }
 +
 +    private static class VertexProgramN extends StaticVertexProgram {
 +
 +        @Override
 +        public void setup(final Memory memory) {
 +
 +        }
 +
 +        @Override
 +        public void execute(final Vertex vertex, final Messenger messenger, final Memory
memory) {
 +            if (vertex.label().equals("person"))
 +                vertex.property(VertexProperty.Cardinality.single, "age", vertex.<Integer>value("age")
+ 1);
 +        }
 +
 +        @Override
 +        public boolean terminate(final Memory memory) {
 +            return memory.getIteration() > 1;
 +        }
 +
 +        @Override
 +        public Set<MessageScope> getMessageScopes(final Memory memory) {
 +            return Collections.emptySet();
 +        }
 +
 +        @Override
 +        public Set<VertexComputeKey> getVertexComputeKeys() {
 +            return Collections.singleton(VertexComputeKey.of("age", false));
 +        }
 +
 +        @Override
 +        public GraphComputer.ResultGraph getPreferredResultGraph() {
 +            return GraphComputer.ResultGraph.NEW;
 +        }
 +
 +        @Override
 +        public GraphComputer.Persist getPreferredPersist() {
 +            return GraphComputer.Persist.VERTEX_PROPERTIES;
 +        }
 +    }
 +
 +    ///////////////////////////////////
 +
 +    @Test
 +    @LoadGraphWith(MODERN)
 +    public void shouldSupportTransientKeys() throws Exception {
 +        final ComputerResult result = graphProvider.getGraphComputer(graph).program(new
VertexProgramO()).mapReduce(new MapReduceK()).submit().get();
 +        result.graph().vertices().forEachRemaining(vertex -> {
 +            assertFalse(vertex.property("v1").isPresent());
 +            assertFalse(vertex.property("v2").isPresent());
 +            assertTrue(vertex.property("v3").isPresent());
 +            assertEquals("shouldExist", vertex.value("v3"));
 +            assertTrue(vertex.property("name").isPresent());
 +            if (vertex.label().equals("software"))
 +                assertTrue(vertex.property("lang").isPresent());
 +            else
 +                assertTrue(vertex.property("age").isPresent());
 +            assertEquals(3, IteratorUtils.count(vertex.properties()));
 +            assertEquals(0, IteratorUtils.count(vertex.properties("v1")));
 +            assertEquals(0, IteratorUtils.count(vertex.properties("v2")));
 +            assertEquals(1, IteratorUtils.count(vertex.properties("v3")));
 +            assertEquals(1, IteratorUtils.count(vertex.properties("name")));
 +        });
 +        assertEquals(6l, result.graph().traversal().V().properties("name").count().next().longValue());
 +        assertEquals(0l, result.graph().traversal().V().properties("v1").count().next().longValue());
 +        assertEquals(0l, result.graph().traversal().V().properties("v2").count().next().longValue());
 +        assertEquals(6l, result.graph().traversal().V().properties("v3").count().next().longValue());
 +        assertEquals(6l, result.graph().traversal().V().<String>values("name").dedup().count().next().longValue());
 +        assertEquals(1l, result.graph().traversal().V().<String>values("v3").dedup().count().next().longValue());
 +        assertEquals("shouldExist", result.graph().traversal().V().<String>values("v3").dedup().next());
 +        ///
 +        assertFalse(result.memory().exists("m1"));
 +        assertFalse(result.memory().exists("m2"));
 +        assertTrue(result.memory().exists("m3"));
 +        assertEquals(24l, result.memory().<Long>get("m3").longValue());
 +        assertEquals(2, result.memory().keys().size());  // mapReduceK
 +    }
 +
 +    private static class VertexProgramO extends StaticVertexProgram {
 +
 +        @Override
 +        public void setup(final Memory memory) {
 +            assertFalse(memory.exists("m1"));
 +            assertFalse(memory.exists("m2"));
 +            assertFalse(memory.exists("m3"));
 +        }
 +
 +        @Override
 +        public void execute(final Vertex vertex, final Messenger messenger, final Memory
memory) {
 +            if (memory.isInitialIteration()) {
 +                assertFalse(vertex.property("v1").isPresent());
 +                assertFalse(vertex.property("v2").isPresent());
 +                assertFalse(vertex.property("v3").isPresent());
 +                vertex.property("v1", "shouldNotExist");
 +                vertex.property("v2", "shouldNotExist");
 +                vertex.property("v3", "shouldExist");
 +                assertTrue(vertex.property("v1").isPresent());
 +                assertTrue(vertex.property("v2").isPresent());
 +                assertTrue(vertex.property("v3").isPresent());
 +                assertEquals("shouldNotExist", vertex.value("v1"));
 +                assertEquals("shouldNotExist", vertex.value("v2"));
 +                assertEquals("shouldExist", vertex.value("v3"));
 +                //
 +                assertFalse(memory.exists("m1"));
 +                assertFalse(memory.exists("m2"));
 +                assertFalse(memory.exists("m3"));
 +                memory.add("m1", false);
 +                memory.add("m2", true);
 +                memory.add("m3", 2l);
 +                // should still not exist as this pulls from the master memory
 +                assertFalse(memory.exists("m1"));
 +                assertFalse(memory.exists("m2"));
 +                assertFalse(memory.exists("m3"));
 +
 +            } else {
 +                assertTrue(vertex.property("v1").isPresent());
 +                assertTrue(vertex.property("v2").isPresent());
 +                assertTrue(vertex.property("v3").isPresent());
 +                assertEquals("shouldNotExist", vertex.value("v1"));
 +                assertEquals("shouldNotExist", vertex.value("v2"));
 +                assertEquals("shouldExist", vertex.value("v3"));
 +                //
 +                assertTrue(memory.exists("m1"));
 +                assertTrue(memory.exists("m2"));
 +                assertTrue(memory.exists("m3"));
 +                assertFalse(memory.get("m1"));
 +                assertTrue(memory.get("m2"));
 +                assertEquals(12l, memory.<Long>get("m3").longValue());
 +                memory.add("m1", true);
 +                memory.add("m2", true);
 +                memory.add("m3", 2l);
              }
          }
  

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f1aed80b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
----------------------------------------------------------------------


Mime
View raw message