tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [04/50] [abbrv] tinkerpop git commit: came up with a much cleaner GiraphGraphComputer usage around workers.
Date Thu, 19 Jan 2017 17:40:10 GMT
came up with a much cleaner GiraphGraphComputer usage around workers.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/48520308
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/48520308
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/48520308

Branch: refs/heads/TINKERPOP-1564
Commit: 4852030816a4f39cf6787c60a59dae0c80bc2cf6
Parents: 7f597d5
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Mon Dec 19 14:31:29 2016 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Thu Jan 19 10:26:57 2017 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   | 44 +++++++++-----------
 1 file changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48520308/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index db4d6da..b316220 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -58,7 +58,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.Gremlin;
@@ -81,26 +80,14 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
 
     protected GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
     private MapMemory memory = new MapMemory();
-    private boolean useWorkerThreadsInConfiguration;
     private Set<String> vertexProgramConfigurationKeys = new HashSet<>();
 
     public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
-       this(hadoopGraph.configuration());
+        this(hadoopGraph.configuration());
     }
 
     private GiraphGraphComputer(final Configuration configuration) {
         super(configuration);
-        this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
-        this.giraphConfiguration.setVertexClass(GiraphVertex.class);
-        this.giraphConfiguration.setComputationClass(GiraphComputation.class);
-        this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
-        this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
-        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class,
ObjectWritable.class);
-        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class,
VertexWritable.class);
-        this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
-        this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
-        this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
-        this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS,
-666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(),
-666) != -666;
     }
 
     public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration
configuration) {
@@ -108,14 +95,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
     }
 
     @Override
-    public Future<ComputerResult> submit(final Graph graph) {
-        final Configuration configuration = graph.configuration();
-        this.configuration.copy(configuration);
-        configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key,
configuration.getProperty(key).toString()));
-        return this.submit();
-    }
-
-    @Override
     public GraphComputer program(final VertexProgram vertexProgram) {
         super.program(vertexProgram);
         this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram);
@@ -129,6 +108,13 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
     }
 
     @Override
+    public GraphComputer workers(final int workers) {
+        this.configuration.clearProperty(GiraphConstants.MAX_WORKERS);
+        this.configuration.clearProperty(GiraphConstants.NUM_COMPUTE_THREADS.getKey());
+        return super.workers(workers);
+    }
+
+    @Override
     public Future<ComputerResult> submit() {
         super.validateStatePriorToExecution();
         return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor,
"GiraphSubmitter");
@@ -137,7 +123,16 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
     private Future<ComputerResult> submitWithExecutor(final Executor exec) {
         final long startTime = System.currentTimeMillis();
         this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key,
this.configuration.getProperty(key).toString()));
-        this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS,
-666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(),
-666) != -666;
+        this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
+        this.giraphConfiguration.setVertexClass(GiraphVertex.class);
+        this.giraphConfiguration.setComputationClass(GiraphComputation.class);
+        this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
+        this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
+        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class,
ObjectWritable.class);
+        this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class,
VertexWritable.class);
+        this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
+        this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
+        this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
         final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
         ConfigurationUtils.copy(this.configuration, apacheConfiguration);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
@@ -181,7 +176,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer
imple
                 if (!this.vertexProgram.getMessageCombiner().isPresent())
                     this.giraphConfiguration.unset(GiraphConstants.MESSAGE_COMBINER_CLASS.getKey());
                 // split required workers across system (open map slots + max threads per
machine = total amount of TinkerPop workers)
-                if (!this.useWorkerThreadsInConfiguration) {
+                if (!(this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666)
!= -666 ||
+                        this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(),
-666) != -666)) {
                     final Cluster cluster = new Cluster(GiraphGraphComputer.this.giraphConfiguration);
                     int totalMappers = cluster.getClusterStatus().getMapSlotCapacity() -
1; // 1 is needed for master
                     cluster.close();


Mime
View raw message