Return-Path: X-Original-To: apmail-tinkerpop-commits-archive@minotaur.apache.org Delivered-To: apmail-tinkerpop-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3D5A81876F for ; Wed, 2 Dec 2015 19:02:37 +0000 (UTC) Received: (qmail 23624 invoked by uid 500); 2 Dec 2015 19:02:37 -0000 Delivered-To: apmail-tinkerpop-commits-archive@tinkerpop.apache.org Received: (qmail 23599 invoked by uid 500); 2 Dec 2015 19:02:37 -0000 Mailing-List: contact commits-help@tinkerpop.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.incubator.apache.org Delivered-To: mailing list commits@tinkerpop.incubator.apache.org Received: (qmail 23588 invoked by uid 99); 2 Dec 2015 19:02:37 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 19:02:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A47ED1801DE for ; Wed, 2 Dec 2015 19:02:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.246 X-Spam-Level: * X-Spam-Status: No, score=1.246 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id co8DYQBaMrLO for ; Wed, 2 Dec 2015 19:02:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id E12BB253FA for ; Wed, 2 Dec 2015 19:02:19 +0000 (UTC) Received: (qmail 23409 invoked by uid 99); 2 Dec 2015 19:02:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 19:02:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0729BE00AC; Wed, 2 Dec 2015 19:02:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: okram@apache.org To: commits@tinkerpop.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-tinkerpop git commit: I have the SparkIntegrationTestSuite now testing either from Gryo FileInputFormat, GraphSON FileInputFormat, or an InputRDD. This gives us super coverage and proves that InputRDD (bypassing Hadoop) is working as expected. Date: Wed, 2 Dec 2015 19:02:19 +0000 (UTC) Repository: incubator-tinkerpop Updated Branches: refs/heads/TINKERPOP3-1011 e954408b6 -> c374f4ecb I have the SparkIntegrationTestSuite now testing either from Gryo FileInputFormat, GraphSON FileInputFormat, or an InputRDD. This gives us super coverage and proves that InputRDD (bypassing Hadoop) is working as expected. I also fixed up some other tests that used KryoSerializer instead of GryoSerializer as I learned how to deal with Scalas WrappedArray class. It was insane. This is really good stuff. Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c374f4ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c374f4ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c374f4ec Branch: refs/heads/TINKERPOP3-1011 Commit: c374f4ecb1c2c82f911b5fb0a19a66ed663eea60 Parents: e954408 Author: Marko A. Rodriguez Authored: Wed Dec 2 12:02:09 2015 -0700 Committer: Marko A. Rodriguez Committed: Wed Dec 2 12:02:09 2015 -0700 ---------------------------------------------------------------------- .../structure/hdfs/HadoopElementIterator.java | 6 ++- .../computer/payload/MessagePayload.java | 5 +- .../computer/payload/ViewIncomingPayload.java | 6 +-- .../computer/payload/ViewOutgoingPayload.java | 12 +++-- .../spark/structure/io/gryo/GryoSerializer.java | 6 +++ .../io/gryo/WrappedArraySerializer.java | 46 ++++++++++++++++++ .../computer/SparkHadoopGraphProvider.java | 40 ++++++++++++++-- .../spark/structure/io/ClassicInputRDD.java | 39 +++++++++++++++ .../spark/structure/io/GratefulInputRDD.java | 50 ++++++++++++++++++++ .../spark/structure/io/InputOutputRDDTest.java | 3 +- .../spark/structure/io/InputRDDTest.java | 5 +- .../spark/structure/io/ModernInputRDD.java | 41 ++++++++++++++++ .../spark/structure/io/OutputRDDTest.java | 3 +- .../spark/structure/io/TheCrewInputRDD.java | 39 +++++++++++++++ 14 files changed, 285 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java index f9ffea2..45f3c55 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java @@ -19,6 +19,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -56,7 +58,9 @@ public abstract class HadoopElementIterator implements Iterat final InputFormat inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance(); if (inputFormat instanceof FileInputFormat) { if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION)) - return; // there is not input location and thus, no data (empty graph) + return; // there is no input location and thus, no data (empty graph) + if (!FileSystem.get(configuration).exists(new Path(this.graph.configuration().getInputLocation()))) + return; // there is no data at the input location (empty graph) configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, this.graph.configuration().getInputLocation()); } final List splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1))); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java index 09e2599..f32ec44 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java @@ -23,7 +23,10 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.payload; */ public final class MessagePayload implements Payload { - private final M message; + private M message; + + private MessagePayload() { + } public MessagePayload(final M message) { this.message = message; http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java index 911fc7b..a2c9205 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java @@ -31,11 +31,11 @@ import java.util.List; public final class ViewIncomingPayload implements Payload { private List> view = null; - private final List incomingMessages; + private List incomingMessages; - public ViewIncomingPayload() { - this.incomingMessages = null; + private ViewIncomingPayload() { + } public ViewIncomingPayload(final MessageCombiner messageCombiner) { http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java index fc4aeed..20c8e09 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java @@ -28,10 +28,14 @@ import java.util.List; */ public final class ViewOutgoingPayload implements Payload { - private final List> view; - private final List> outgoingMessages; + private List> view; + private List> outgoingMessages; - public ViewOutgoingPayload(final List> view, final List> outgoingMessages) { + private ViewOutgoingPayload() { + + } + + public ViewOutgoingPayload(final List> view, final List> outgoingMessages) { this.view = view; this.outgoingMessages = outgoingMessages; } @@ -40,7 +44,7 @@ public final class ViewOutgoingPayload implements Payload { return new ViewPayload(this.view); } - public List> getOutgoingMessages() { + public List> getOutgoingMessages() { return this.outgoingMessages; } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java index 7202892..29fde9f 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java @@ -41,6 +41,8 @@ import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; import org.apache.tinkerpop.shaded.kryo.io.Output; import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer; import scala.Tuple2; +import scala.Tuple3; +import scala.collection.mutable.WrappedArray; import scala.runtime.BoxedUnit; import java.util.Collections; @@ -79,11 +81,15 @@ public final class GryoSerializer extends Serializer { try { builder.addCustom(SerializableWritable.class, new JavaSerializer()) .addCustom(Tuple2.class, new JavaSerializer()) + .addCustom(Tuple2[].class, new JavaSerializer()) + .addCustom(Tuple3.class, new JavaSerializer()) + .addCustom(Tuple3[].class, new JavaSerializer()) .addCustom(CompressedMapStatus.class, new JavaSerializer()) .addCustom(HttpBroadcast.class, new JavaSerializer()) .addCustom(PythonBroadcast.class, new JavaSerializer()) .addCustom(BoxedUnit.class, new JavaSerializer()) .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()) + .addCustom(WrappedArray.ofRef.class, new WrappedArraySerializer()) .addCustom(MessagePayload.class) .addCustom(ViewIncomingPayload.class) .addCustom(ViewOutgoingPayload.class) http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java new file mode 100644 index 0000000..0e9f03f --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; + +import org.apache.tinkerpop.shaded.kryo.Kryo; +import org.apache.tinkerpop.shaded.kryo.Serializer; +import org.apache.tinkerpop.shaded.kryo.io.Input; +import org.apache.tinkerpop.shaded.kryo.io.Output; +import scala.collection.JavaConversions; +import scala.collection.mutable.WrappedArray; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class WrappedArraySerializer extends Serializer> { + + @Override + public void write(final Kryo kryo, final Output output, final WrappedArray iterable) { + kryo.writeClassAndObject(output,new ArrayList<>(JavaConversions.asJavaList(iterable))); + } + + @Override + public WrappedArray read(final Kryo kryo, final Input input, final Class> aClass) { + return new WrappedArray.ofRef<>((T[]) ((List) kryo.readClassAndObject(input)).toArray()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index c81ea92..2328176 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -20,12 +20,19 @@ package org.apache.tinkerpop.gremlin.spark.process.computer; import org.apache.tinkerpop.gremlin.GraphProvider; import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; +import org.apache.tinkerpop.gremlin.spark.structure.io.ClassicInputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.GratefulInputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat; +import org.apache.tinkerpop.gremlin.spark.structure.io.ModernInputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.TheCrewInputRDD; import org.apache.tinkerpop.gremlin.structure.Graph; import java.util.Map; +import java.util.Random; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -33,13 +40,41 @@ import java.util.Map; @GraphProvider.Descriptor(computer = SparkGraphComputer.class) public final class SparkHadoopGraphProvider extends HadoopGraphProvider { + private static final Random RANDOM = new Random(); + @Override public Map getBaseConfiguration(final String graphName, final Class test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { final Map config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); - config.put("mapreduce.job.reduces", 4); + if (null != loadGraphWith) { + if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) { + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ModernInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + } + } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) { + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, TheCrewInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + } + } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) { + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ClassicInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + } + } else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) { + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, GratefulInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + } + } + } /// spark configuration config.put("spark.master", "local[4]"); - // put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + //config.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); config.put("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer"); config.put("spark.kryo.registrationRequired", true); return config; @@ -49,5 +84,4 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider { public GraphTraversalSource traversal(final Graph graph) { return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph); } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java new file mode 100644 index 0000000..4512b61 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ClassicInputRDD implements InputRDD { + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java new file mode 100644 index 0000000..396aa75 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +import java.io.IOException; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class GratefulInputRDD implements InputRDD { + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + try { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile()); + return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(graph.vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } catch (final IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java index 3691aba..ea62114 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexPr import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.junit.Test; @@ -41,7 +42,7 @@ public class InputOutputRDDTest { public void shouldReadFromWriteToArbitraryRDD() throws Exception { final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName()); configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName()); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java index 98a2b9f..2cbfd66 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java @@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.junit.Test; @@ -42,7 +43,7 @@ public class InputRDDTest { public void shouldReadFromArbitraryRDD() { final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName()); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName()); @@ -58,7 +59,7 @@ public class InputRDDTest { public void shouldSupportHadoopGraphOLTP() { final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName()); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java new file mode 100644 index 0000000..849e3e6 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ModernInputRDD implements InputRDD { + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + return sparkContext. + parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new))). + mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java index 10eecb3..f9b6f39 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java @@ -30,6 +30,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSo import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.junit.Test; @@ -43,7 +44,7 @@ public class OutputRDDTest { public void shouldWriteToArbitraryRDD() throws Exception { final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c374f4ec/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java new file mode 100644 index 0000000..ff5a274 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TheCrewInputRDD implements InputRDD { + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } +}