Return-Path: X-Original-To: apmail-tinkerpop-commits-archive@minotaur.apache.org Delivered-To: apmail-tinkerpop-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DF09018BF6 for ; Thu, 11 Feb 2016 21:32:44 +0000 (UTC) Received: (qmail 76371 invoked by uid 500); 11 Feb 2016 21:32:44 -0000 Delivered-To: apmail-tinkerpop-commits-archive@tinkerpop.apache.org Received: (qmail 76345 invoked by uid 500); 11 Feb 2016 21:32:44 -0000 Mailing-List: contact commits-help@tinkerpop.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.incubator.apache.org Delivered-To: mailing list commits@tinkerpop.incubator.apache.org Received: (qmail 76336 invoked by uid 99); 11 Feb 2016 21:32:44 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2016 21:32:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0C080C0873 for ; Thu, 11 Feb 2016 21:32:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id C2t0NDietESV for ; Thu, 11 Feb 2016 21:32:39 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 13E2A429C4 for ; Thu, 11 Feb 2016 21:32:38 +0000 (UTC) Received: (qmail 76312 invoked by uid 99); 11 Feb 2016 21:32:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2016 21:32:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 73E88E0534; Thu, 11 Feb 2016 21:32:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: okram@apache.org To: commits@tinkerpop.incubator.apache.org Message-Id: <4dccc061f21f4bc98483a5c6f4e1a5de@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-tinkerpop git commit: Was really dumb about SparkMessenger and GiraphMessenger. No need to insert a start step. Weird. ComputerVerificationStrategy can now recognize when edge properties are being manipulated in MapReduce. Added TraveralHelper. Date: Thu, 11 Feb 2016 21:32:38 +0000 (UTC) Repository: incubator-tinkerpop Updated Branches: refs/heads/TINKERPOP-1140 e2e38e691 -> 17bfdb82c Was really dumb about SparkMessenger and GiraphMessenger. No need to insert a start step. Weird. ComputerVerificationStrategy can now recognize when edge properties are being manipulated in MapReduce. Added TraveralHelper.getLastElementClass(traversal). Neato. Fixed up FileSystem access in SparkGraphComputer. I think we have a bug in 3.1.2 that makes chaining files bad. Weird we don't have a test for that. However, most people don't chain vertex programs so I suspect no one will notice. 3.2.0 have it down pat as you need solid chaining the new multi-OLAP compiler to work. Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/17bfdb82 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/17bfdb82 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/17bfdb82 Branch: refs/heads/TINKERPOP-1140 Commit: 17bfdb82ce89e24295940ad1926ae95e17eea821 Parents: e2e38e6 Author: Marko A. Rodriguez Authored: Thu Feb 11 14:32:30 2016 -0700 Committer: Marko A. Rodriguez Committed: Thu Feb 11 14:32:30 2016 -0700 ---------------------------------------------------------------------- .../process/computer/GiraphGraphComputer.java | 7 +++- .../process/computer/GiraphMessenger.java | 3 +- .../step/map/PageRankVertexProgramStep.java | 4 +- .../mapreduce/TraverserMapReduce.java | 7 +--- .../ComputerVerificationStrategy.java | 3 ++ .../process/traversal/util/TraversalHelper.java | 28 ++++++++++++++ .../process/computer/SparkGraphComputer.java | 40 +++++++++++--------- .../spark/process/computer/SparkMessenger.java | 3 +- .../computer/SparkHadoopGraphProvider.java | 3 +- 9 files changed, 66 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java index fce64a4..fdded04 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java @@ -177,8 +177,11 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple // prepare the giraph vertex-centric computing job final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram); // handle input paths (if any) - if (FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) { - FileInputFormat.setInputPaths(job.getInternalJob(), Constants.getSearchGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), storage).get()); + String inputLocation = this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null); + if (null != inputLocation && FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) { + inputLocation = Constants.getSearchGraphLocation(inputLocation, storage).orElse(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); + apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(this.giraphConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString()); + this.giraphConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(this.giraphConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString()); } // handle output paths final Path outputPath = new Path(Constants.getGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java index 4abb3e7..f5855cb 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java @@ -23,7 +23,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.Messenger; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; @@ -70,7 +69,7 @@ public final class GiraphMessenger implements Messenger { } private static > T setVertexStart(final Traversal incidentTraversal, final Vertex vertex) { - incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex)); + incidentTraversal.asAdmin().addStart(incidentTraversal.asAdmin().getTraverserGenerator().generate(vertex, incidentTraversal.asAdmin().getStartStep(), 1l)); return (T) incidentTraversal; } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java index c0f556c..d6b16cf 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java @@ -64,12 +64,12 @@ public final class PageRankVertexProgramStep extends AbstractStep traverser = this.starts.next(); final Graph graph = traverser.get().graph(); - final GraphComputer graphComputer = this.graphComputerFunction.apply(graph).persist(GraphComputer.Persist.EDGES); + final GraphComputer graphComputer = this.graphComputerFunction.apply(graph).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW); return traverser.split(graphComputer.program(PageRankVertexProgram.build().traversal(this.compileTraversal(graph)).create(graph)).submit().get(), this); } } catch (final InterruptedException | ExecutionException e) { http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java index 88cb09c..10f8e9a 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java @@ -37,9 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyMapStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; -import org.apache.tinkerpop.gremlin.process.traversal.util.EmptyTraversal; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.T; @@ -106,11 +104,8 @@ public final class TraverserMapReduce extends StaticMapReduce> emitter) { vertex.>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> IteratorUtils.removeOnNext(traverserSet.iterator()).forEachRemaining(traverser -> { - if (this.attachHaltedTraverser) { - if (traverser.get() instanceof Edge) - throw new VerificationException("Edges can not be accessed -- this should really be caught at compile time", EmptyTraversal.instance()); + if (this.attachHaltedTraverser && !(traverser.get() instanceof Edge)) traverser.attach(Attachable.Method.get(vertex)); - } if (this.comparator.isPresent()) // TODO: I think we shouldn't ever single key it -- always double emit to load balance the servers. emitter.emit(traverser, traverser); else http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java index 1bbfd79..aefe44c 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java @@ -44,6 +44,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierS import org.apache.tinkerpop.gremlin.process.traversal.step.util.SupplyingBarrierStep; import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.structure.Edge; import java.util.Arrays; import java.util.HashSet; @@ -109,6 +110,8 @@ public final class ComputerVerificationStrategy extends AbstractTraversalStrateg if (endStep instanceof CollectingBarrierStep && endStep instanceof TraversalParent) { if (((TraversalParent) endStep).getLocalChildren().stream().filter(t -> !TraversalHelper.isLocalVertex(t)).findAny().isPresent()) throw new VerificationException("A final CollectingBarrierStep can not process the incident edges of a vertex: " + endStep, traversal); + if (!((TraversalParent) endStep).getLocalChildren().isEmpty() && TraversalHelper.getLastElementClass(traversal).equals(Edge.class)) + throw new VerificationException("The final CollectingBarrierStep can not operate on edges or their properties:" + endStep, traversal); } /// if (endStep instanceof RangeGlobalStep || endStep instanceof TailGlobalStep || endStep instanceof DedupGlobalStep) http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java index cf8395a..1641cc8 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java @@ -30,13 +30,17 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.filter.NotStep; import org.apache.tinkerpop.gremlin.process.traversal.step.filter.WherePredicateStep; import org.apache.tinkerpop.gremlin.process.traversal.step.filter.WhereTraversalStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectOneStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep; import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.util.ArrayList; @@ -58,6 +62,30 @@ public final class TraversalHelper { private TraversalHelper() { } + public static Class getLastElementClass(final Traversal.Admin traversal) { + Step currentStep = traversal.getEndStep(); + while (!(currentStep instanceof EmptyStep)) { + if (currentStep instanceof VertexStep) + return ((VertexStep) currentStep).getReturnClass(); + else if (currentStep instanceof GraphStep) + return ((GraphStep) currentStep).getReturnClass(); + else if (currentStep instanceof EdgeVertexStep) + return Vertex.class; + else if (currentStep instanceof PropertiesStep) + return ((PropertiesStep) currentStep).getReturnType().forProperties() ? Property.class : Object.class; + else if (currentStep instanceof SelectOneStep) { + final String key = ((SelectOneStep) currentStep).getScopeKeys().iterator().next(); + while (!(currentStep instanceof EmptyStep)) { + if (currentStep.getLabels().contains(key)) + break; + currentStep = currentStep.getPreviousStep(); + } + } else + currentStep = currentStep.getPreviousStep(); + } + return Object.class; + } + public static boolean isLocalVertex(final Traversal.Admin traversal) { for (final Step step : traversal.getSteps()) { if (step instanceof RepeatStep && http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index d783020..4d8934f 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -118,31 +118,37 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { } private Future submitWithExecutor(Executor exec) { - // apache and hadoop configurations that are used throughout the graph computer computation - final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration); - apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); - final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration); - if (hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, null) == null && // if an InputRDD is specified, then ignore InputFormat - hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, null) != null && - FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) { - try { - final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString(); - apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation); - hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation); - } catch (final IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - // create the completable future   return CompletableFuture.supplyAsync(() -> { final long startTime = System.currentTimeMillis(); + // apache and hadoop configurations that are used throughout the graph computer computation + final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration); + apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); + final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration); final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration); final Storage sparkContextStorage = SparkContextStorage.open(apacheConfiguration); - // final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, Object.class)); + final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, Object.class)); final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, Object.class)); final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, Object.class)); final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Object.class)); + String inputLocation = null; + if (inputFromSpark) + inputLocation = Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), sparkContextStorage).orElse(null); + else if (inputFromHDFS) + inputLocation = Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), fileSystemStorage).orElse(null); + if (null == inputLocation) + inputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + + if (hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, null) == null && // if an InputRDD is specified, then ignore InputFormat + hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, null) != null && + FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) { + try { + apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString()); + hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString()); + } catch (final IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } final InputRDD inputRDD; final OutputRDD outputRDD; final boolean filtered; http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java index a3b11c8..5b04e2d 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java @@ -22,7 +22,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageScope; import org.apache.tinkerpop.gremlin.process.computer.Messenger; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; @@ -73,7 +72,7 @@ public final class SparkMessenger implements Messenger { /////////// private static > T setVertexStart(final Traversal incidentTraversal, final Vertex vertex) { - incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex)); + incidentTraversal.asAdmin().addStart(incidentTraversal.asAdmin().getTraverserGenerator().generate(vertex, incidentTraversal.asAdmin().getStartStep(), 1l)); return (T) incidentTraversal; } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index 0730ba8..a6010b6 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.PageRankTest; import org.apache.tinkerpop.gremlin.spark.structure.Spark; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck; @@ -50,7 +51,7 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider { public Map getBaseConfiguration(final String graphName, final Class test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { final Map config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast - if (!test.equals(FileSystemStorageCheck.class) && null != loadGraphWith && RANDOM.nextBoolean()) { + if (!test.equals(PageRankTest.Traversals.class) && !test.equals(FileSystemStorageCheck.class) && null != loadGraphWith && RANDOM.nextBoolean()) { config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName()); }