Return-Path: X-Original-To: apmail-tinkerpop-commits-archive@minotaur.apache.org Delivered-To: apmail-tinkerpop-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5833018BFA for ; Wed, 2 Dec 2015 20:24:31 +0000 (UTC) Received: (qmail 94809 invoked by uid 500); 2 Dec 2015 20:24:31 -0000 Delivered-To: apmail-tinkerpop-commits-archive@tinkerpop.apache.org Received: (qmail 94783 invoked by uid 500); 2 Dec 2015 20:24:31 -0000 Mailing-List: contact commits-help@tinkerpop.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.incubator.apache.org Delivered-To: mailing list commits@tinkerpop.incubator.apache.org Received: (qmail 94774 invoked by uid 99); 2 Dec 2015 20:24:31 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 20:24:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id B9FB8C0473 for ; Wed, 2 Dec 2015 20:24:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.247 X-Spam-Level: * X-Spam-Status: No, score=1.247 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Jlt2yLTKiGFK for ; Wed, 2 Dec 2015 20:24:21 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id A688121622 for ; Wed, 2 Dec 2015 20:24:20 +0000 (UTC) Received: (qmail 94709 invoked by uid 99); 2 Dec 2015 20:24:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 20:24:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D05BE00DE; Wed, 2 Dec 2015 20:24:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: okram@apache.org To: commits@tinkerpop.incubator.apache.org Message-Id: <17c47471e20b406db44301b97f3ce9da@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-tinkerpop git commit: some organization and clean up. Stuff is lookin SOLID. Time to run full integration tests. Date: Wed, 2 Dec 2015 20:24:20 +0000 (UTC) Repository: incubator-tinkerpop Updated Branches: refs/heads/TINKERPOP3-1011 311e8abe7 -> e20ff9199 some organization and clean up. Stuff is lookin SOLID. Time to run full integration tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e20ff919 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e20ff919 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e20ff919 Branch: refs/heads/TINKERPOP3-1011 Commit: e20ff91995f83efd4ebbe9388b0aec1535669ecb Parents: 311e8ab Author: Marko A. Rodriguez Authored: Wed Dec 2 13:11:29 2015 -0700 Committer: Marko A. Rodriguez Committed: Wed Dec 2 13:11:29 2015 -0700 ---------------------------------------------------------------------- .../process/computer/SparkContextHelper.java | 39 +++++++++++ .../process/computer/SparkGraphComputer.java | 15 ++--- .../spark/structure/io/InputRDDFormat.java | 8 +-- .../process/computer/LocalPropertyTest.java | 4 +- .../computer/SparkHadoopGraphProvider.java | 38 +++-------- .../spark/structure/io/ClassicInputRDD.java | 39 ----------- .../spark/structure/io/GratefulInputRDD.java | 50 -------------- .../spark/structure/io/InputOutputRDDTest.java | 1 - .../spark/structure/io/InputRDDTest.java | 1 - .../spark/structure/io/ModernInputRDD.java | 41 ------------ .../spark/structure/io/OutputRDDTest.java | 1 - .../io/PersistedInputOutputRDDTest.java | 10 +-- .../spark/structure/io/TheCrewInputRDD.java | 39 ----------- .../spark/structure/io/ToyGraphInputRDD.java | 69 ++++++++++++++++++++ 14 files changed, 134 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java new file mode 100644 index 0000000..ed553c5 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.process.computer; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.Constants; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class SparkContextHelper { + + private SparkContextHelper() { + + } + + public static void tryToCloseContext(final JavaSparkContext context, final Configuration configuration) { + if (context != null && !configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) + context.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index e7566d5..c20f1b0 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -228,8 +228,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { finalMemory.setRuntime(System.currentTimeMillis() - startTime); return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable()); } finally { - if (sparkContext != null && !apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) - sparkContext.stop(); + SparkContextHelper.tryToCloseContext(sparkContext, apacheConfiguration); } }, exec); } @@ -268,14 +267,14 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { * Execution rather than applying the entire configuration. */ final String[] validPropertyNames = { - "spark.job.description", - "spark.jobGroup.id", - "spark.job.interruptOnCancel", - "spark.scheduler.pool" + "spark.job.description", + "spark.jobGroup.id", + "spark.job.interruptOnCancel", + "spark.scheduler.pool" }; - for (String propertyName: validPropertyNames){ - if (sparkConfiguration.contains(propertyName)){ + for (String propertyName : validPropertyNames) { + if (sparkConfiguration.contains(propertyName)) { String propertyValue = sparkConfiguration.get(propertyName); this.logger.info("Setting Thread Local SparkContext Property - " + propertyName + " : " + propertyValue); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java index 3952c66..d1a198e 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java @@ -31,6 +31,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkContextHelper; import scala.Tuple2; import java.io.IOException; @@ -70,8 +71,8 @@ public final class InputRDDFormat extends InputFormat sparkConfiguration.set(entry.getKey(), entry.getValue())); - InputRDD inputRDD = (InputRDD) Class.forName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD)).newInstance(); - JavaSparkContext javaSparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); + final InputRDD inputRDD = (InputRDD) Class.forName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD)).newInstance(); + final JavaSparkContext javaSparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); final Iterator> iterator = inputRDD.readGraphRDD(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()), javaSparkContext).toLocalIterator(); return new RecordReader() { @Override @@ -101,8 +102,7 @@ public final class InputRDDFormat extends InputFormat getBaseConfiguration(final String graphName, final Class test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { final Map config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); if (null != loadGraphWith) { - if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ModernInputRDD.class.getCanonicalName()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } - } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, TheCrewInputRDD.class.getCanonicalName()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } - } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ClassicInputRDD.class.getCanonicalName()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } - } else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, GratefulInputRDD.class.getCanonicalName()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName()); + config.put(ToyGraphInputRDD.GREMLIN_SPARK_TOY_GRAPH, loadGraphWith.toString()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); } } /// spark configuration config.put("spark.master", "local[4]"); - //config.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - config.put("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer"); + config.put("spark.serializer", GryoSerializer.class.getCanonicalName()); config.put("spark.kryo.registrationRequired", true); return config; } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java deleted file mode 100644 index 4512b61..0000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import org.apache.commons.configuration.Configuration; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import scala.Tuple2; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ClassicInputRDD implements InputRDD { - - @Override - public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java deleted file mode 100644 index 396aa75..0000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import org.apache.commons.configuration.Configuration; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import scala.Tuple2; - -import java.io.IOException; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class GratefulInputRDD implements InputRDD { - - @Override - public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - try { - final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile()); - return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(graph.vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); - } catch (final IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java index ea62114..50a43bc 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java @@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java index 2cbfd66..b64139c 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java @@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java deleted file mode 100644 index 849e3e6..0000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import org.apache.commons.configuration.Configuration; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import scala.Tuple2; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ModernInputRDD implements InputRDD { - - @Override - public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - return sparkContext. - parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new))). - mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java index f9b6f39..60790e7 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java @@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java index 6aeb864..1de2b47 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java @@ -24,7 +24,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; @@ -37,6 +36,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSo import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.io.IoCore; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; @@ -58,7 +58,7 @@ public class PersistedInputOutputRDDTest { final String rddName = "target/test-output/" + UUID.randomUUID(); final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); @@ -87,7 +87,7 @@ public class PersistedInputOutputRDDTest { final String rddName = "target/test-output/" + UUID.randomUUID(); final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); @@ -129,7 +129,7 @@ public class PersistedInputOutputRDDTest { final String rddName = "target/test-output/" + UUID.randomUUID().toString(); final Configuration readConfiguration = new BaseConfiguration(); readConfiguration.setProperty("spark.master", "local[4]"); - readConfiguration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); @@ -176,7 +176,7 @@ public class PersistedInputOutputRDDTest { final String rddName = "target/test-output/" + UUID.randomUUID().toString(); final Configuration readConfiguration = new BaseConfiguration(); readConfiguration.setProperty("spark.master", "local[4]"); - readConfiguration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java deleted file mode 100644 index ff5a274..0000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import org.apache.commons.configuration.Configuration; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import scala.Tuple2; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class TheCrewInputRDD implements InputRDD { - - @Override - public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e20ff919/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java new file mode 100644 index 0000000..ed97c04 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ToyGraphInputRDD implements InputRDD { + + public static final String GREMLIN_SPARK_TOY_GRAPH = "gremlin.spark.toyGraph"; + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + final List vertices; + if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.MODERN.toString())) + vertices = IteratorUtils.list(TinkerFactory.createModern().vertices()); + else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.CLASSIC.toString())) + vertices = IteratorUtils.list(TinkerFactory.createClassic().vertices()); + else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.CREW.toString())) + vertices = IteratorUtils.list(TinkerFactory.createTheCrew().vertices()); + else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.GRATEFUL.toString())) { + try { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile()); + vertices = IteratorUtils.list(graph.vertices()); + } catch (final IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } else + throw new IllegalArgumentException("No legal toy graph was provided to load: " + configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH)); + + return sparkContext.parallelize(vertices.stream().map(VertexWritable::new).collect(Collectors.toList())).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } +}