tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject tinkerpop git commit: no more SynchronizedIterator for TinkerGraphComputer. Each worker/thread has their own Iterator<Vertex> partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced (easily).
Date Wed, 04 Jan 2017 19:39:32 GMT
Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1585 0b7294695 -> 5aee91d5e


no more SynchronizedIterator for TinkerGraphComputer. Each worker/thread has their own Iterator<Vertex>
partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced (easily).


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/5aee91d5
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/5aee91d5
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/5aee91d5

Branch: refs/heads/TINKERPOP-1585
Commit: 5aee91d5e19957efee7a5ea195c0459911208087
Parents: 0b72946
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed Jan 4 12:39:35 2017 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Wed Jan 4 12:39:35 2017 -0700

----------------------------------------------------------------------
 .../process/computer/TinkerGraphComputer.java   | 14 +++-----
 .../process/computer/TinkerWorkerPool.java      | 36 +++++++++++++++++---
 2 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5aee91d5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 7523d63..2abce9a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -156,24 +156,21 @@ public final class TinkerGraphComputer implements GraphComputer {
         this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
         return computerService.submit(() -> {
             final long time = System.currentTimeMillis();
-            final TinkerGraphComputerView view;
-            final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers);
+            final TinkerGraphComputerView view = TinkerHelper.createGraphComputerView(this.graph,
this.graphFilter, null != this.vertexProgram ? this.vertexProgram.getVertexComputeKeys() :
Collections.emptySet());
+            final TinkerWorkerPool workers = new TinkerWorkerPool(this.graph, this.memory,
this.workers);
             try {
                 if (null != this.vertexProgram) {
-                    view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter,
this.vertexProgram.getVertexComputeKeys());
                     // execute the vertex program
                     this.vertexProgram.setup(this.memory);
                     while (true) {
                         if (Thread.interrupted()) throw new TraversalInterruptedException();
                         this.memory.completeSubRound();
                         workers.setVertexProgram(this.vertexProgram);
-                        final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
-                        workers.executeVertexProgram((vertexProgram, workerMemory) ->
{
+                        workers.executeVertexProgram((vertices, vertexProgram, workerMemory)
-> {
                             vertexProgram.workerIterationStart(workerMemory.asImmutable());
-                            while (true) {
+                            while (vertices.hasNext()) {
                                 final Vertex vertex = vertices.next();
                                 if (Thread.interrupted()) throw new TraversalInterruptedException();
-                                if (null == vertex) break;
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, vertexProgram),
                                         new TinkerMessenger<>(vertex, this.messageBoard,
vertexProgram.getMessageCombiner()),
@@ -192,9 +189,6 @@ public final class TinkerGraphComputer implements GraphComputer {
                         }
                     }
                     view.complete(); // drop all transient vertex compute keys
-                } else {
-                    // MapReduce only
-                    view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter,
Collections.emptySet());
                 }
 
                 // execute mapreduce jobs

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5aee91d5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 140d347..637b416 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -23,14 +23,20 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper;
+import org.apache.tinkerpop.gremlin.util.function.TriConsumer;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 /**
@@ -48,13 +54,33 @@ public final class TinkerWorkerPool implements AutoCloseable {
     private VertexProgramPool vertexProgramPool;
     private MapReducePool mapReducePool;
     private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>();
+    private final List<List<Vertex>> workerVertices = new ArrayList<>();
 
-    public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) {
+    public TinkerWorkerPool(final TinkerGraph graph, final TinkerMemory memory, final int
numberOfWorkers) {
         this.numberOfWorkers = numberOfWorkers;
         this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
         this.completionService = new ExecutorCompletionService<>(this.workerPool);
         for (int i = 0; i < this.numberOfWorkers; i++) {
             this.workerMemoryPool.add(new TinkerWorkerMemory(memory));
+            this.workerVertices.add(new ArrayList<>());
+        }
+        int batchSize = TinkerHelper.getVertices(graph).size() / this.numberOfWorkers;
+        if (0 == batchSize)
+            batchSize = 1;
+        int counter = 0;
+        int index = 0;
+
+        List<Vertex> currentWorkerVertices = this.workerVertices.get(index);
+        final Iterator<Vertex> iterator = graph.vertices();
+        while (iterator.hasNext()) {
+            final Vertex vertex = iterator.next();
+            if (counter++ < batchSize || index == this.workerVertices.size() - 1) {
+                currentWorkerVertices.add(vertex);
+            } else {
+                currentWorkerVertices = this.workerVertices.get(++index);
+                currentWorkerVertices.add(vertex);
+                counter = 1;
+            }
         }
     }
 
@@ -66,12 +92,14 @@ public final class TinkerWorkerPool implements AutoCloseable {
         this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
     }
 
-    public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory>
worker) throws InterruptedException {
+    public void executeVertexProgram(final TriConsumer<Iterator<Vertex>, VertexProgram,
TinkerWorkerMemory> worker) throws InterruptedException {
         for (int i = 0; i < this.numberOfWorkers; i++) {
+            final int index = i;
             this.completionService.submit(() -> {
                 final VertexProgram vp = this.vertexProgramPool.take();
                 final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll();
-                worker.accept(vp, workerMemory);
+                final List<Vertex> vertices = this.workerVertices.get(index);
+                worker.accept(vertices.iterator(), vp, workerMemory);
                 this.vertexProgramPool.offer(vp);
                 this.workerMemoryPool.offer(workerMemory);
                 return null;


Mime
View raw message