giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 23184e1
Date Wed, 29 Jun 2016 01:44:23 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 77a24a9fc -> 23184e150


GIRAPH-1081: Fix a bug in internal out-of-core infra: multithreaded accesses to buffers

Summary: The multi-threaded accesses to raw data buffers in `DiskBackedDataStore` is overlooked,
violating assumption on properly partitioning data to different IO threads.

Test Plan: mvn clean verify

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D60147


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/23184e15
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/23184e15
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/23184e15

Branch: refs/heads/trunk
Commit: 23184e15023157312d7a8afe0adfe30b5d7864a8
Parents: 77a24a9
Author: Hassan Eslami <heslami@fb.com>
Authored: Tue Jun 28 18:43:18 2016 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Tue Jun 28 18:43:33 2016 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/ooc/data/DiskBackedDataStore.java  | 11 ++++++++---
 .../org/apache/giraph/ooc/data/MetaPartitionManager.java |  6 +++---
 2 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/23184e15/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
index 7265410..e9ab167 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
@@ -354,16 +354,21 @@ public abstract class DiskBackedDataStore<T> {
    * offloaded to disk), and sees if any of them has enough raw data buffer in
    * memory. If so, puts that partition in a list to return.
    *
+   * @param ioThreadId Id of the IO thread who would offload the buffers
    * @return Set of partition ids of all partition raw buffers where the
    *         aggregate size of buffers are large enough and it is worth flushing
    *         those buffers to disk
    */
-  public Set<Integer> getCandidateBuffersToOffload() {
+  public Set<Integer> getCandidateBuffersToOffload(int ioThreadId) {
     Set<Integer> result = new HashSet<>();
     for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
         dataBuffers.entrySet()) {
-      if (entry.getValue().getLeft() > minBufferSizeToOffload) {
-        result.add(entry.getKey());
+      int partitionId = entry.getKey();
+      long aggregateBufferSize = entry.getValue().getLeft();
+      if (aggregateBufferSize > minBufferSizeToOffload &&
+          oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) ==
+              ioThreadId) {
+        result.add(partitionId);
       }
     }
     return result;

http://git-wip-us.apache.org/repos/asf/giraph/blob/23184e15/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 64e3aed..3075829 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -332,12 +332,12 @@ public class MetaPartitionManager {
             (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData()
                 .getPartitionStore());
         perThreadVertexEdgeBuffers.get(threadId)
-            .addAll(partitionStore.getCandidateBuffersToOffload());
+            .addAll(partitionStore.getCandidateBuffersToOffload(threadId));
         DiskBackedEdgeStore<?, ?, ?> edgeStore =
             (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData())
                 .getEdgeStore();
         perThreadVertexEdgeBuffers.get(threadId)
-            .addAll(edgeStore.getCandidateBuffersToOffload());
+            .addAll(edgeStore.getCandidateBuffersToOffload(threadId));
         partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId));
       }
       return partitionId;
@@ -361,7 +361,7 @@ public class MetaPartitionManager {
                 .getIncomingMessageStore());
         if (messageStore != null) {
           perThreadMessageBuffers.get(threadId)
-              .addAll(messageStore.getCandidateBuffersToOffload());
+              .addAll(messageStore.getCandidateBuffersToOffload(threadId));
           partitionId = popFromSet(perThreadMessageBuffers.get(threadId));
         }
       }


Mime
View raw message