From commits-return-26360-archive-asf-public=cust-asf.ponee.io@tinkerpop.apache.org Fri Feb 23 13:01:20 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9E991180652 for ; Fri, 23 Feb 2018 13:01:19 +0100 (CET) Received: (qmail 31922 invoked by uid 500); 23 Feb 2018 12:01:18 -0000 Mailing-List: contact commits-help@tinkerpop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.apache.org Delivered-To: mailing list commits@tinkerpop.apache.org Received: (qmail 31902 invoked by uid 99); 23 Feb 2018 12:01:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2018 12:01:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E3F4E0AEB; Fri, 23 Feb 2018 12:01:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: spmallette@apache.org To: commits@tinkerpop.apache.org Date: Fri, 23 Feb 2018 12:01:18 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] tinkerpop git commit: Apply edgeFunction in SparkMessenger Repository: tinkerpop Updated Branches: refs/heads/tp32 d42d54d6a -> 3df6c5806 Apply edgeFunction in SparkMessenger Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/744c4ecb Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/744c4ecb Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/744c4ecb Branch: refs/heads/tp32 Commit: 744c4ecbc042acdc481045d6df5077d52d370ac9 Parents: 92a09d8 Author: zhuchenchen Authored: Mon Jan 22 11:15:41 2018 +0800 Committer: zhuchenchen Committed: Mon Jan 22 11:15:41 2018 +0800 ---------------------------------------------------------------------- .../spark/process/computer/SparkMessenger.java | 2 +- .../process/computer/SparkMessengerTest.java | 86 ++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/744c4ecb/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java index aab7ecd..53a755c 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java @@ -63,7 +63,7 @@ public final class SparkMessenger implements Messenger { final MessageScope.Local localMessageScope = (MessageScope.Local) messageScope; final Traversal.Admin incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex); final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal); - incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message))); + incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), localMessageScope.getEdgeFunction().apply(message, edge)))); } else { ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message))); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/744c4ecb/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java new file mode 100644 index 0000000..c280ab2 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessengerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.process.computer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.tinkerpop.gremlin.process.computer.MessageScope; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; +import org.junit.Assert; +import org.junit.Test; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.List; +import java.util.function.BiFunction; + +/** + * @author Dean Zhu + */ +public class SparkMessengerTest extends AbstractSparkTest { + private static ObjectMapper objectmapper = new ObjectMapper(); + + @Test + public void testSparkMessenger() throws Exception { + // Define scopes + final MessageScope.Local orderSrcMessageScope = MessageScope.Local + .of(__::inE, new BiFunction() { + @Override + public String apply(String message, Edge edge) { + System.out.println(edge); + if ("mocked_edge_label1".equals(edge.label())) { + return message; + } + return null; + } + }); + final MessageScope.Local inMessageScope = MessageScope.Local.of(__::inE); + + // Define star graph + final StarGraph starGraph = StarGraph.open(); + Object[] vertex0Array = new Object[]{T.id, 0, T.label, "mocked_vertex_label1"}; + Object[] vertex1Array = new Object[]{T.id, 1, T.label, "mocked_vertex_label2"}; + Object[] vertex2Array = new Object[]{T.id, 2, T.label, "mocked_vertex_label2"}; + Vertex vertex0 = starGraph.addVertex(vertex0Array); + Vertex vertex1 = starGraph.addVertex(vertex1Array); + Vertex vertex2 = starGraph.addVertex(vertex2Array); + vertex1.addEdge("mocked_edge_label1", vertex0); + vertex2.addEdge("mocked_edge_label2", vertex0); + + // Create Spark Messenger + final SparkMessenger messenger = new SparkMessenger<>(); + final List incomingMessages = Arrays.asList("a", "b", "c"); + messenger.setVertexAndIncomingMessages(vertex0, incomingMessages); + + messenger.sendMessage(orderSrcMessageScope, "a"); + List> outgoingMessages0 = messenger.getOutgoingMessages(); + System.out.println(objectmapper.writeValueAsString(outgoingMessages0)); + + Assert.assertEquals("a", outgoingMessages0.get(0)._2()); + Assert.assertNull(outgoingMessages0.get(1)._2()); + //messenger.sendMessage(inMessageScope, "a"); + //List> outgoingMessages1 = messenger.getOutgoingMessages(); + //System.out.println(objectmapper.writeValueAsString(outgoingMessages1)); + } +} \ No newline at end of file