tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [1/5] incubator-tinkerpop git commit: Added GraphComputer.config(key, value). There is no general way to test this as general configurations outside the scope of the fluent methods of GraphComputer are particular to the underlying engine -- e.g. mapreduce
Date Thu, 29 Oct 2015 15:02:14 GMT
Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 0578a41c1 -> d936191e8


Added GraphComputer.config(key,value). There is no general way to test this as general configurations
outside the scope of the fluent methods of GraphComputer are particular to the underlying
engine -- e.g. mapreduce.mappers etc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e3d126fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e3d126fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e3d126fe

Branch: refs/heads/master
Commit: e3d126fe844d0b8015988fe1e3244cb98c9fbf5d
Parents: 37df079
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Tue Oct 13 19:09:22 2015 -0600
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Tue Oct 13 19:09:22 2015 -0600

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   |  6 ++++
 .../gremlin/process/computer/GraphComputer.java | 15 +++++++--
 .../process/computer/SparkGraphComputer.java    | 35 ++++++++++++--------
 .../process/computer/TinkerGraphComputer.java   |  5 +++
 4 files changed, 45 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3d126fe/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 5923329..7c09259 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -93,6 +93,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
     }
 
     @Override
+    public GraphComputer config(final String key, final Object value) {
+        this.giraphConfiguration.set(key, value.toString());
+        return this;
+    }
+
+    @Override
     public GraphComputer program(final VertexProgram vertexProgram) {
         super.program(vertexProgram);
         this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3d126fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index 0818ed8..2ae63e2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -95,8 +95,8 @@ public interface GraphComputer {
     public GraphComputer mapReduce(final MapReduce mapReduce);
 
     /**
-     * Set the desired number of workers to execute the {@code VertexProgram} and {@code
MapReduce} jobs.
-     * This is a recommendation to the underlying {@code GraphComputer} implementation and
is allowed to deviate accordingly by the implementation.
+     * Set the desired number of workers to execute the {@link VertexProgram} and {@link
MapReduce} jobs.
+     * This is a recommendation to the underlying {@link GraphComputer} implementation and
is allowed to deviate accordingly by the implementation.
      *
      * @param workers the number of workers to execute the submission
      * @return the updated GraphComputer with newly set worker count
@@ -104,6 +104,17 @@ public interface GraphComputer {
     public GraphComputer workers(final int workers);
 
     /**
+     * Set an arbitrary configuration key/value for the underlying {@link org.apache.commons.configuration.Configuration}
in the {@link GraphComputer}.
+     * Typically, the other fluent methods in {@link GraphComputer} should be used to configure
the computation.
+     * However, for some custom configuration in the underlying engine, this method should
be used.
+     *
+     * @param key   the key of the configuration
+     * @param value the value of the configuration
+     * @return the updated GraphComputer with newly set key/value configuration
+     */
+    public GraphComputer config(final String key, final Object value);
+
+    /**
      * Submit the {@link VertexProgram} and the set of {@link MapReduce} jobs for execution
by the {@link GraphComputer}.
      *
      * @return a {@link Future} denoting a reference to the asynchronous computation and
where to get the {@link org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult}
when its is complete.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3d126fe/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 12ca39c..c3593c1 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -61,30 +61,39 @@ import java.util.stream.Stream;
  */
 public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
+    private final org.apache.commons.configuration.Configuration sparkConfiguration;
+
     public SparkGraphComputer(final HadoopGraph hadoopGraph) {
         super(hadoopGraph);
+        this.sparkConfiguration = new HadoopConfiguration();
+        ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
     }
 
     @Override
     public GraphComputer workers(final int workers) {
         super.workers(workers);
-        if (this.hadoopGraph.configuration().getString("spark.master").startsWith("local"))
{
-            this.hadoopGraph.configuration().setProperty("spark.master", "local[" + this.workers
+ "]");
+        if (this.sparkConfiguration.getString("spark.master").startsWith("local")) {
+            this.sparkConfiguration.setProperty("spark.master", "local[" + this.workers +
"]");
         }
         return this;
     }
 
     @Override
+    public GraphComputer config(final String key, final Object value) {
+        this.sparkConfiguration.setProperty(key, value);
+        return this;
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         super.validateStatePriorToExecution();
         // apache and hadoop configurations that are used throughout the graph computer computation
-        final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
-        apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES,
this.persist.equals(GraphComputer.Persist.EDGES));
-        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
+        this.sparkConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES,
this.persist.equals(GraphComputer.Persist.EDGES));
+        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.sparkConfiguration);
         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT,
InputFormat.class))) {
             try {
                 final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new
Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-                apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
inputLocation);
+                this.sparkConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
inputLocation);
                 hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
inputLocation);
             } catch (final IOException e) {
                 throw new IllegalStateException(e.getMessage(), e);
@@ -112,7 +121,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
             hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(),
entry.getValue()));
             // execute the vertex program and map reducers and if there is a failure, auto-close
the spark context
             JavaSparkContext sparkContext = null;
-            try  {
+            try {
                 sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
                 // add the project jars to the cluster
                 this.loadJars(sparkContext, hadoopConfiguration);
@@ -121,7 +130,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                 try {
                     graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD,
InputFormatRDD.class, InputRDD.class)
                             .newInstance()
-                            .readGraphRDD(apacheConfiguration, sparkContext)
+                            .readGraphRDD(this.sparkConfiguration, sparkContext)
                             .setName("graphRDD")
                             .cache();
                 } catch (final InstantiationException | IllegalAccessException e) {
@@ -139,7 +148,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                     memory.broadcastMemory(sparkContext);
                     final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
                     this.vertexProgram.storeState(vertexProgramConfiguration);
-                    ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+                    ConfigurationUtils.copy(vertexProgramConfiguration, this.sparkConfiguration);
                     ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration,
hadoopConfiguration);
                     // execute the vertex program
                     while (true) {
@@ -158,7 +167,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                         try {
                             hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD,
OutputFormatRDD.class, OutputRDD.class)
                                     .newInstance()
-                                    .writeGraphRDD(apacheConfiguration, graphRDD);
+                                    .writeGraphRDD(this.sparkConfiguration, graphRDD);
                         } catch (final InstantiationException | IllegalAccessException e)
{
                             throw new IllegalStateException(e.getMessage(), e);
                         }
@@ -175,7 +184,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                     final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD,
viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // execute the map reduce job
-                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
                         mapReduce.storeState(newApacheConfiguration);
                         // map
                         final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD)
mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
@@ -189,9 +198,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer
{
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                 return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph,
this.resultGraph, this.persist), finalMemory.asImmutable());
-            }
-            finally
-            {
+            } finally {
                 if (sparkContext != null && !hadoopGraph.configuration().getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
false))
                     sparkContext.stop();
             }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3d126fe/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 07ad0c8..7092f99 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -92,6 +92,11 @@ public final class TinkerGraphComputer implements GraphComputer {
     }
 
     @Override
+    public GraphComputer config(final String key, final Object value) {
+        return this;
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         // a graph computer can only be executed once
         if (this.executed)


Mime
View raw message