Return-Path: X-Original-To: apmail-tinkerpop-commits-archive@minotaur.apache.org Delivered-To: apmail-tinkerpop-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9508A18493 for ; Thu, 29 Oct 2015 15:02:19 +0000 (UTC) Received: (qmail 43141 invoked by uid 500); 29 Oct 2015 15:02:19 -0000 Delivered-To: apmail-tinkerpop-commits-archive@tinkerpop.apache.org Received: (qmail 43116 invoked by uid 500); 29 Oct 2015 15:02:19 -0000 Mailing-List: contact commits-help@tinkerpop.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tinkerpop.incubator.apache.org Delivered-To: mailing list commits@tinkerpop.incubator.apache.org Received: (qmail 43107 invoked by uid 99); 29 Oct 2015 15:02:19 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Oct 2015 15:02:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 11BBE1A2FB0 for ; Thu, 29 Oct 2015 15:02:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Ed99U2v-pLH1 for ; Thu, 29 Oct 2015 15:02:15 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 3053444485 for ; Thu, 29 Oct 2015 15:02:14 +0000 (UTC) Received: (qmail 42974 invoked by uid 99); 29 Oct 2015 15:02:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Oct 2015 15:02:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 577E5E3928; Thu, 29 Oct 2015 15:02:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: okram@apache.org To: commits@tinkerpop.incubator.apache.org Date: Thu, 29 Oct 2015 15:02:14 -0000 Message-Id: <5b1628f8b39543e4929e640c6bd96f8d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] incubator-tinkerpop git commit: Added GraphComputer.config(key, value). There is no general way to test this as general configurations outside the scope of the fluent methods of GraphComputer are particular to the underlying engine -- e.g. mapreduce Repository: incubator-tinkerpop Updated Branches: refs/heads/master 0578a41c1 -> d936191e8 Added GraphComputer.config(key,value). There is no general way to test this as general configurations outside the scope of the fluent methods of GraphComputer are particular to the underlying engine -- e.g. mapreduce.mappers etc. Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e3d126fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e3d126fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e3d126fe Branch: refs/heads/master Commit: e3d126fe844d0b8015988fe1e3244cb98c9fbf5d Parents: 37df079 Author: Marko A. Rodriguez Authored: Tue Oct 13 19:09:22 2015 -0600 Committer: Marko A. Rodriguez Committed: Tue Oct 13 19:09:22 2015 -0600 ---------------------------------------------------------------------- .../process/computer/GiraphGraphComputer.java | 6 ++++ .../gremlin/process/computer/GraphComputer.java | 15 +++++++-- .../process/computer/SparkGraphComputer.java | 35 ++++++++++++-------- .../process/computer/TinkerGraphComputer.java | 5 +++ 4 files changed, 45 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3d126fe/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java index 5923329..7c09259 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java @@ -93,6 +93,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple } @Override + public GraphComputer config(final String key, final Object value) { + this.giraphConfiguration.set(key, value.toString()); + return this; + } + + @Override public GraphComputer program(final VertexProgram vertexProgram) { super.program(vertexProgram); this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3d126fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java index 0818ed8..2ae63e2 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java @@ -95,8 +95,8 @@ public interface GraphComputer { public GraphComputer mapReduce(final MapReduce mapReduce); /** - * Set the desired number of workers to execute the {@code VertexProgram} and {@code MapReduce} jobs. - * This is a recommendation to the underlying {@code GraphComputer} implementation and is allowed to deviate accordingly by the implementation. + * Set the desired number of workers to execute the {@link VertexProgram} and {@link MapReduce} jobs. + * This is a recommendation to the underlying {@link GraphComputer} implementation and is allowed to deviate accordingly by the implementation. * * @param workers the number of workers to execute the submission * @return the updated GraphComputer with newly set worker count @@ -104,6 +104,17 @@ public interface GraphComputer { public GraphComputer workers(final int workers); /** + * Set an arbitrary configuration key/value for the underlying {@link org.apache.commons.configuration.Configuration} in the {@link GraphComputer}. + * Typically, the other fluent methods in {@link GraphComputer} should be used to configure the computation. + * However, for some custom configuration in the underlying engine, this method should be used. + * + * @param key the key of the configuration + * @param value the value of the configuration + * @return the updated GraphComputer with newly set key/value configuration + */ + public GraphComputer config(final String key, final Object value); + + /** * Submit the {@link VertexProgram} and the set of {@link MapReduce} jobs for execution by the {@link GraphComputer}. * * @return a {@link Future} denoting a reference to the asynchronous computation and where to get the {@link org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult} when its is complete. http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3d126fe/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index 12ca39c..c3593c1 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -61,30 +61,39 @@ import java.util.stream.Stream; */ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { + private final org.apache.commons.configuration.Configuration sparkConfiguration; + public SparkGraphComputer(final HadoopGraph hadoopGraph) { super(hadoopGraph); + this.sparkConfiguration = new HadoopConfiguration(); + ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); } @Override public GraphComputer workers(final int workers) { super.workers(workers); - if (this.hadoopGraph.configuration().getString("spark.master").startsWith("local")) { - this.hadoopGraph.configuration().setProperty("spark.master", "local[" + this.workers + "]"); + if (this.sparkConfiguration.getString("spark.master").startsWith("local")) { + this.sparkConfiguration.setProperty("spark.master", "local[" + this.workers + "]"); } return this; } @Override + public GraphComputer config(final String key, final Object value) { + this.sparkConfiguration.setProperty(key, value); + return this; + } + + @Override public Future submit() { super.validateStatePriorToExecution(); // apache and hadoop configurations that are used throughout the graph computer computation - final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration()); - apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); - final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration); + this.sparkConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); + final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.sparkConfiguration); if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) { try { final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString(); - apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation); + this.sparkConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation); hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation); } catch (final IOException e) { throw new IllegalStateException(e.getMessage(), e); @@ -112,7 +121,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue())); // execute the vertex program and map reducers and if there is a failure, auto-close the spark context JavaSparkContext sparkContext = null; - try { + try { sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); // add the project jars to the cluster this.loadJars(sparkContext, hadoopConfiguration); @@ -121,7 +130,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { try { graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class) .newInstance() - .readGraphRDD(apacheConfiguration, sparkContext) + .readGraphRDD(this.sparkConfiguration, sparkContext) .setName("graphRDD") .cache(); } catch (final InstantiationException | IllegalAccessException e) { @@ -139,7 +148,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { memory.broadcastMemory(sparkContext); final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration(); this.vertexProgram.storeState(vertexProgramConfiguration); - ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration); + ConfigurationUtils.copy(vertexProgramConfiguration, this.sparkConfiguration); ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration); // execute the vertex program while (true) { @@ -158,7 +167,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { try { hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class) .newInstance() - .writeGraphRDD(apacheConfiguration, graphRDD); + .writeGraphRDD(this.sparkConfiguration, graphRDD); } catch (final InstantiationException | IllegalAccessException e) { throw new IllegalStateException(e.getMessage(), e); } @@ -175,7 +184,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { final JavaPairRDD mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache(); for (final MapReduce mapReduce : this.mapReducers) { // execute the map reduce job - final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration); + final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(this.sparkConfiguration); mapReduce.storeState(newApacheConfiguration); // map final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD"); @@ -189,9 +198,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { // update runtime and return the newly computed graph finalMemory.setRuntime(System.currentTimeMillis() - startTime); return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable()); - } - finally - { + } finally { if (sparkContext != null && !hadoopGraph.configuration().getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) sparkContext.stop(); } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3d126fe/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java index 07ad0c8..7092f99 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java @@ -92,6 +92,11 @@ public final class TinkerGraphComputer implements GraphComputer { } @Override + public GraphComputer config(final String key, final Object value) { + return this; + } + + @Override public Future submit() { // a graph computer can only be executed once if (this.executed)