tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject incubator-tinkerpop git commit: Was really dumb about SparkMessenger and GiraphMessenger. No need to insert a start step. Weird. ComputerVerificationStrategy can now recognize when edge properties are being manipulated in MapReduce. Added TraveralHelper.
Date Thu, 11 Feb 2016 21:32:38 GMT
Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1140 e2e38e691 -> 17bfdb82c


Was really dumb about SparkMessenger and GiraphMessenger. No need to insert a start step.
Weird. ComputerVerificationStrategy can now recognize when edge properties are being manipulated
in MapReduce. Added TraveralHelper.getLastElementClass(traversal). Neato. Fixed up FileSystem
access in SparkGraphComputer. I think we have a bug in 3.1.2 that makes chaining files bad.
Weird we don't have a test for that. However, most people don't chain vertex programs so I
suspect no one will notice. 3.2.0 have it down pat as you need solid chaining the new multi-OLAP
compiler to work.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/17bfdb82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/17bfdb82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/17bfdb82

Branch: refs/heads/TINKERPOP-1140
Commit: 17bfdb82ce89e24295940ad1926ae95e17eea821
Parents: e2e38e6
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Thu Feb 11 14:32:30 2016 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Thu Feb 11 14:32:30 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   |  7 +++-
 .../process/computer/GiraphMessenger.java       |  3 +-
 .../step/map/PageRankVertexProgramStep.java     |  4 +-
 .../mapreduce/TraverserMapReduce.java           |  7 +---
 .../ComputerVerificationStrategy.java           |  3 ++
 .../process/traversal/util/TraversalHelper.java | 28 ++++++++++++++
 .../process/computer/SparkGraphComputer.java    | 40 +++++++++++---------
 .../spark/process/computer/SparkMessenger.java  |  3 +-
 .../computer/SparkHadoopGraphProvider.java      |  3 +-
 9 files changed, 66 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index fce64a4..fdded04 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -177,8 +177,11 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
                 // prepare the giraph vertex-centric computing job
                 final GiraphJob job = new GiraphJob(this.giraphConfiguration, Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX
+ this.vertexProgram);
                 // handle input paths (if any)
-                if (FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT,
InputFormat.class))) {
-                    FileInputFormat.setInputPaths(job.getInternalJob(), Constants.getSearchGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
storage).get());
+                String inputLocation = this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION,
null);
+                if (null != inputLocation && FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT,
InputFormat.class))) {
+                    inputLocation = Constants.getSearchGraphLocation(inputLocation, storage).orElse(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+                    apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
FileSystem.get(this.giraphConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
+                    this.giraphConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
FileSystem.get(this.giraphConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
                 }
                 // handle output paths
                 final Path outputPath = new Path(Constants.getGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
index 4abb3e7..f5855cb 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMessenger.java
@@ -23,7 +23,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -70,7 +69,7 @@ public final class GiraphMessenger<M> implements Messenger<M>
{
     }
 
     private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final
Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
-        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(),
vertex));
+        incidentTraversal.asAdmin().addStart(incidentTraversal.asAdmin().getTraverserGenerator().generate(vertex,
incidentTraversal.asAdmin().getStartStep(), 1l));
         return (T) incidentTraversal;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
index c0f556c..d6b16cf 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PageRankVertexProgramStep.java
@@ -64,12 +64,12 @@ public final class PageRankVertexProgramStep extends AbstractStep<ComputerResult
             if (this.first && this.getPreviousStep() instanceof EmptyStep) {
                 this.first = false;
                 final Graph graph = this.getTraversal().getGraph().get();
-                final GraphComputer graphComputer = this.graphComputerFunction.apply(graph).persist(GraphComputer.Persist.EDGES);
+                final GraphComputer graphComputer = this.graphComputerFunction.apply(graph).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW);
                 return this.traversal.getTraverserGenerator().generate(graphComputer.program(PageRankVertexProgram.build().traversal(this.compileTraversal(graph)).create(graph)).submit().get(),
this, 1l);
             } else {
                 final Traverser.Admin<ComputerResult> traverser = this.starts.next();
                 final Graph graph = traverser.get().graph();
-                final GraphComputer graphComputer = this.graphComputerFunction.apply(graph).persist(GraphComputer.Persist.EDGES);
+                final GraphComputer graphComputer = this.graphComputerFunction.apply(graph).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW);
                 return traverser.split(graphComputer.program(PageRankVertexProgram.build().traversal(this.compileTraversal(graph)).create(graph)).submit().get(),
this);
             }
         } catch (final InterruptedException | ExecutionException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
index 88cb09c..10f8e9a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
@@ -37,9 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyMapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.EmptyTraversal;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.T;
@@ -106,11 +104,8 @@ public final class TraverserMapReduce extends StaticMapReduce<Comparable,
Traver
     @Override
     public void map(final Vertex vertex, final MapEmitter<Comparable, Traverser<?>>
emitter) {
         vertex.<TraverserSet<Object>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet
-> IteratorUtils.removeOnNext(traverserSet.iterator()).forEachRemaining(traverser ->
{
-            if (this.attachHaltedTraverser) {
-                if (traverser.get() instanceof Edge)
-                    throw new VerificationException("Edges can not be accessed -- this should
really be caught at compile time", EmptyTraversal.instance());
+            if (this.attachHaltedTraverser && !(traverser.get() instanceof Edge))
                 traverser.attach(Attachable.Method.get(vertex));
-            }
             if (this.comparator.isPresent())    // TODO: I think we shouldn't ever single
key it  -- always double emit to load balance the servers.
                 emitter.emit(traverser, traverser);
             else

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
index 1bbfd79..aefe44c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
@@ -44,6 +44,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierS
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.SupplyingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 
 import java.util.Arrays;
 import java.util.HashSet;
@@ -109,6 +110,8 @@ public final class ComputerVerificationStrategy extends AbstractTraversalStrateg
             if (endStep instanceof CollectingBarrierStep && endStep instanceof TraversalParent)
{
                 if (((TraversalParent) endStep).getLocalChildren().stream().filter(t ->
!TraversalHelper.isLocalVertex(t)).findAny().isPresent())
                     throw new VerificationException("A final CollectingBarrierStep can not
process the incident edges of a vertex: " + endStep, traversal);
+                if (!((TraversalParent) endStep).getLocalChildren().isEmpty() &&
TraversalHelper.getLastElementClass(traversal).equals(Edge.class))
+                    throw new VerificationException("The final CollectingBarrierStep can
not operate on edges or their properties:" + endStep, traversal);
             }
             ///
             if (endStep instanceof RangeGlobalStep || endStep instanceof TailGlobalStep ||
endStep instanceof DedupGlobalStep)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
index cf8395a..1641cc8 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
@@ -30,13 +30,17 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.filter.NotStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.WherePredicateStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.WhereTraversalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectOneStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.ArrayList;
@@ -58,6 +62,30 @@ public final class TraversalHelper {
     private TraversalHelper() {
     }
 
+    public static Class getLastElementClass(final Traversal.Admin<?, ?> traversal)
{
+        Step<?, ?> currentStep = traversal.getEndStep();
+        while (!(currentStep instanceof EmptyStep)) {
+            if (currentStep instanceof VertexStep)
+                return ((VertexStep) currentStep).getReturnClass();
+            else if (currentStep instanceof GraphStep)
+                return ((GraphStep) currentStep).getReturnClass();
+            else if (currentStep instanceof EdgeVertexStep)
+                return Vertex.class;
+            else if (currentStep instanceof PropertiesStep)
+                return ((PropertiesStep) currentStep).getReturnType().forProperties() ? Property.class
: Object.class;
+            else if (currentStep instanceof SelectOneStep) {
+                final String key = ((SelectOneStep<?, ?>) currentStep).getScopeKeys().iterator().next();
+                while (!(currentStep instanceof EmptyStep)) {
+                    if (currentStep.getLabels().contains(key))
+                        break;
+                    currentStep = currentStep.getPreviousStep();
+                }
+            } else
+                currentStep = currentStep.getPreviousStep();
+        }
+        return Object.class;
+    }
+
     public static boolean isLocalVertex(final Traversal.Admin<?, ?> traversal) {
         for (final Step step : traversal.getSteps()) {
             if (step instanceof RepeatStep &&

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index d783020..4d8934f 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -118,31 +118,37 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
     }
 
     private Future<ComputerResult> submitWithExecutor(Executor exec) {
-        // apache and hadoop configurations that are used throughout the graph computer computation
-        final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
-        apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES,
this.persist.equals(GraphComputer.Persist.EDGES));
-        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
-        if (hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, null) == null
&& // if an InputRDD is specified, then ignore InputFormat
-                hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, null)
!= null &&
-                FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT,
InputFormat.class))) {
-            try {
-                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new
Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-                apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
inputLocation);
-                hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
inputLocation);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-
         // create the completable future                                                
   
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             final long startTime = System.currentTimeMillis();
+            // apache and hadoop configurations that are used throughout the graph computer
computation
+            final org.apache.commons.configuration.Configuration apacheConfiguration = new
HadoopConfiguration(this.sparkConfiguration);
+            apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES,
this.persist.equals(GraphComputer.Persist.EDGES));
+            final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
             final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
             final Storage sparkContextStorage = SparkContextStorage.open(apacheConfiguration);
-            // final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT,
Object.class));
+            final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT,
Object.class));
             final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD,
Object.class));
             final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT,
Object.class));
             final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD,
Object.class));
+            String inputLocation = null;
+            if (inputFromSpark)
+                inputLocation = Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
sparkContextStorage).orElse(null);
+            else if (inputFromHDFS)
+                inputLocation = Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
fileSystemStorage).orElse(null);
+            if (null == inputLocation)
+                inputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+
+            if (hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, null) ==
null && // if an InputRDD is specified, then ignore InputFormat
+                    hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT,
null) != null &&
+                    FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT,
InputFormat.class))) {
+                try {
+                    apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
+                    hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+            }
             final InputRDD inputRDD;
             final OutputRDD outputRDD;
             final boolean filtered;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
index a3b11c8..5b04e2d 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
@@ -22,7 +22,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -73,7 +72,7 @@ public final class SparkMessenger<M> implements Messenger<M>
{
     ///////////
 
     private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final
Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
-        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(),
vertex));
+        incidentTraversal.asAdmin().addStart(incidentTraversal.asAdmin().getTraverserGenerator().generate(vertex,
incidentTraversal.asAdmin().getStartStep(), 1l));
         return (T) incidentTraversal;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/17bfdb82/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 0730ba8..a6010b6 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.PageRankTest;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
@@ -50,7 +51,7 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider
{
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?>
test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
         final Map<String, Object> config = super.getBaseConfiguration(graphName, test,
testMethodName, loadGraphWith);
         config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);  // this makes the test
suite go really fast
-        if (!test.equals(FileSystemStorageCheck.class) && null != loadGraphWith &&
RANDOM.nextBoolean()) {
+        if (!test.equals(PageRankTest.Traversals.class) && !test.equals(FileSystemStorageCheck.class)
&& null != loadGraphWith && RANDOM.nextBoolean()) {
             config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName());
         }
 


Mime
View raw message