giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to b0262f8
Date Tue, 12 Jul 2016 18:38:31 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk b51ecd27c -> b0262f8c8


[GIRAPH-1089] Fix a bug in out-of-core infrastructure

Summary: This diff fixes a bug in out-of-core infrastructure that caused user requirement
(max number of partitions in memory) for fixed out-of-core strategy get violated. The cause
of the problems was the un-clear definition of in-memory partitions. In this diff, we distinguish
the partitions that are entirely in memory from those that are partially in memory.

Test Plan:
mvn clean verify

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D60573


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b0262f8c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b0262f8c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b0262f8c

Branch: refs/heads/trunk
Commit: b0262f8c81c352c0cf3ac11e1e98646aa9587944
Parents: b51ecd2
Author: Hassan Eslami <heslami@fb.com>
Authored: Tue Jul 12 11:33:38 2016 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Tue Jul 12 11:33:38 2016 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/ooc/OutOfCoreEngine.java  |   2 +-
 .../apache/giraph/ooc/OutOfCoreIOCallable.java  |   4 +-
 .../apache/giraph/ooc/OutOfCoreIOScheduler.java |   4 +-
 .../giraph/ooc/data/MetaPartitionManager.java   | 202 ++++++++++++++++---
 .../ooc/persistence/LocalDiskDataAccessor.java  |   4 -
 .../ooc/policy/FixedPartitionsOracle.java       |  15 +-
 .../giraph/partition/TestPartitionStores.java   |  18 +-
 .../java/org/apache/giraph/TestOutOfCore.java   |  49 +++--
 8 files changed, 230 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index d5bfd4f..65399b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -491,7 +491,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver
{
     superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
       @Override
       public Double value() {
-        return metaPartitionManager.getLowestGraphFractionInMemory() * 100;
+        return metaPartitionManager.getGraphFractionInMemory() * 100;
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
index 829ad80..c21be95 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
@@ -76,7 +76,7 @@ public class OutOfCoreIOCallable implements Callable<Void>,
     while (true) {
       oocEngine.getSuperstepLock().readLock().lock();
       IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId);
-      if (LOG.isInfoEnabled()) {
+      if (LOG.isInfoEnabled() && !(command instanceof WaitIOCommand)) {
         LOG.info("call: thread " + diskId + "'s next IO command is: " +
             command);
       }
@@ -101,7 +101,7 @@ public class OutOfCoreIOCallable implements Callable<Void>,
         timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
             .getSuperstepGCTime() - timeInGC;
         bytes = command.bytesTransferred();
-        if (LOG.isInfoEnabled()) {
+        if (LOG.isInfoEnabled() && !(command instanceof WaitIOCommand)) {
           LOG.info("call: thread " + diskId + "'s command " + command +
               " completed: bytes= " + bytes + ", duration=" + duration + ", " +
               "bandwidth=" + String.format("%.2f", (double) bytes / duration *

http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
index 906607d..3dc1019 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
@@ -102,8 +102,8 @@ public class OutOfCoreIOScheduler {
       }
       OutOfCoreOracle.IOAction[] actions =
           oocEngine.getOracle().getNextIOActions();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("getNextIOCommand: actions are " + Arrays.toString(actions));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("getNextIOCommand: actions are " + Arrays.toString(actions));
       }
       // Check whether there are any urgent outstanding load requests
       if (!threadLoadCommandQueue.get(threadId).isEmpty()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 3075829..173b451 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -54,13 +54,31 @@ public class MetaPartitionManager {
   /** Different storage states for data */
   private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT };
   /**
+   * Different storage states for a partition as a whole (i.e. the partition
+   * and its current messages)
+   */
+  private enum PartitionStorageState
+    /**
+     * Either both partition and its current messages are in memory, or both
+     * are on disk, or one part is on disk and the other part is in memory.
+     */
+  { FULLY_IN_MEM, PARTIALLY_IN_MEM, FULLY_ON_DISK };
+  /**
    * Different processing states for partitions. Processing states are reset
    * at the beginning of each iteration cycle over partitions.
    */
   private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS };
 
-  /** Number of in-memory partitions */
+  /**
+   * Number of partitions in-memory (partition and current messages in memory)
+   */
   private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
+  /**
+   * Number of partitions that are partially in-memory (either partition or its
+   * current messages is in memory and the other part is not)
+   */
+  private final AtomicInteger numPartiallyInMemoryPartitions =
+      new AtomicInteger(0);
   /** Map (dictionary) of partitions to their meta information */
   private final ConcurrentMap<Integer, MetaPartition> partitions =
       Maps.newConcurrentMap();
@@ -136,8 +154,6 @@ public class MetaPartitionManager {
   }
 
   /**
-   * Get number of partitions in memory
-   *
    * @return number of partitions in memory
    */
   public int getNumInMemoryPartitions() {
@@ -145,6 +161,13 @@ public class MetaPartitionManager {
   }
 
   /**
+   * @return number of partitions that are partially in memory
+   */
+  public int getNumPartiallyInMemoryPartitions() {
+    return numPartiallyInMemoryPartitions.get();
+  }
+
+  /**
    * Get total number of partitions
    *
    * @return total number of partitions
@@ -153,8 +176,16 @@ public class MetaPartitionManager {
     return partitions.size();
   }
 
-  public double getLowestGraphFractionInMemory() {
-    return lowestGraphFractionInMemory.get();
+  /**
+   * Since the statistics are based on estimates, we assume each partial
+   * partition is taking about half of the full partition in terms of memory
+   * footprint.
+   *
+   * @return estimate of fraction of graph in memory
+   */
+  public double getGraphFractionInMemory() {
+    return (getNumInMemoryPartitions() +
+        getNumPartiallyInMemoryPartitions() / 2.0) / getNumPartitions();
   }
 
   /**
@@ -162,8 +193,7 @@ public class MetaPartitionManager {
    * information in one of the counters.
    */
   private synchronized void updateGraphFractionInMemory() {
-    double graphInMemory =
-        (double) getNumInMemoryPartitions() / getNumPartitions();
+    double graphInMemory = getGraphFractionInMemory();
     if (graphInMemory < lowestGraphFractionInMemory.get()) {
       lowestGraphFractionInMemory.set(graphInMemory);
       WorkerProgress.get().updateLowestGraphPercentageInMemory(
@@ -172,6 +202,26 @@ public class MetaPartitionManager {
   }
 
   /**
+   * Update the book-keeping about number of in-memory partitions and partially
+   * in-memory partitions with regard to the storage status of the partition and
+   * its current messages before and after an update to its status.
+   *
+   * @param stateBefore the storage state of the partition and its current
+   *                    messages before an update
+   * @param stateAfter the storage state of the partition and its current
+   *                   messages after an update
+   */
+  private void updateCounters(PartitionStorageState stateBefore,
+                              PartitionStorageState stateAfter) {
+    numInMemoryPartitions.getAndAdd(
+        ((stateAfter == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0) -
+            ((stateBefore == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0));
+    numPartiallyInMemoryPartitions.getAndAdd(
+        ((stateAfter == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0) -
+            ((stateBefore == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0));
+  }
+
+  /**
    * Whether a given partition is available
    *
    * @param partitionId id of the partition to check if this worker owns it
@@ -266,49 +316,63 @@ public class MetaPartitionManager {
   }
 
   /**
-   * Get id of a partition to offload on disk
+   * Get id of a partition to offload to disk. Prioritize offloading processed
+   * partitions over unprocessed partition. Also, prioritize offloading
+   * partitions partially in memory over partitions fully in memory.
    *
    * @param threadId id of the thread who is going to store the partition on
    *                 disk
    * @return id of the partition to offload on disk
    */
   public Integer getOffloadPartitionId(int threadId) {
+    // First, look for a processed partition partially on disk
     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.PROCESSED,
         StorageState.IN_MEM,
-        StorageState.IN_MEM,
+        StorageState.ON_DISK,
         null);
     if (meta != null) {
       return meta.getPartitionId();
     }
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.PROCESSED,
+        StorageState.ON_DISK,
         StorageState.IN_MEM,
-        null,
         null);
     if (meta != null) {
       return meta.getPartitionId();
     }
+    // Second, look for a processed partition entirely in memory
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.PROCESSED,
-        null,
+        StorageState.IN_MEM,
         StorageState.IN_MEM,
         null);
     if (meta != null) {
       return meta.getPartitionId();
     }
 
+    // Third, look for an unprocessed partition partially on disk
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.UNPROCESSED,
         StorageState.IN_MEM,
-        null,
+        StorageState.ON_DISK,
         null);
     if (meta != null) {
       return meta.getPartitionId();
     }
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.UNPROCESSED,
-        null,
+        StorageState.ON_DISK,
+        StorageState.IN_MEM,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+    // Forth, look for an unprocessed partition entirely in memory
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.UNPROCESSED,
+        StorageState.IN_MEM,
         StorageState.IN_MEM,
         null);
     if (meta != null) {
@@ -371,7 +435,11 @@ public class MetaPartitionManager {
   }
 
   /**
-   * Get id of a partition to offload its incoming message on disk
+   * Get id of a partition to offload its incoming message on disk. Prioritize
+   * offloading messages of partitions already on disk, and then partitions
+   * in-transit, over partitions in-memory. Also, prioritize processed
+   * partitions over unprocessed (processed partitions would go on disk with
+   * more chances that unprocessed partitions)
    *
    * @param threadId id of the thread who is going to store the incoming
    *                 messages on disk
@@ -389,7 +457,14 @@ public class MetaPartitionManager {
     if (meta != null) {
       return meta.getPartitionId();
     }
-
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.PROCESSED,
+        StorageState.IN_TRANSIT,
+        null,
+        StorageState.IN_MEM);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.UNPROCESSED,
         StorageState.ON_DISK,
@@ -398,16 +473,27 @@ public class MetaPartitionManager {
     if (meta != null) {
       return meta.getPartitionId();
     }
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.UNPROCESSED,
+        StorageState.IN_TRANSIT,
+        null,
+        StorageState.IN_MEM);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
     return null;
   }
 
   /**
-   * Get id of a partition to load its data to memory
+   * Get id of a partition to load its data to memory. Prioritize loading an
+   * unprocessed partition over loading processed partition. Also, prioritize
+   * loading a partition partially in memory over partitions entirely on disk.
    *
    * @param threadId id of the thread who is going to load the partition data
    * @return id of the partition to load its data to memory
    */
   public Integer getLoadPartitionId(int threadId) {
+    // First, look for an unprocessed partition partially in memory
     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.UNPROCESSED,
         StorageState.IN_MEM,
@@ -420,29 +506,51 @@ public class MetaPartitionManager {
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.UNPROCESSED,
         StorageState.ON_DISK,
-        null,
+        StorageState.IN_MEM,
         null);
     if (meta != null) {
       return meta.getPartitionId();
     }
 
+    // Second, look for an unprocessed partition entirely on disk
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.UNPROCESSED,
+        StorageState.ON_DISK,
+        StorageState.ON_DISK,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+
+    // Third, look for a processed partition partially in memory
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.PROCESSED,
+        StorageState.IN_MEM,
         StorageState.ON_DISK,
-        null,
         null);
     if (meta != null) {
-      meta.getPartitionId();
+      return meta.getPartitionId();
     }
 
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.PROCESSED,
-        null,
         StorageState.ON_DISK,
+        StorageState.IN_MEM,
         null);
     if (meta != null) {
-      meta.getPartitionId();
+      return meta.getPartitionId();
     }
+
+    // Forth, look for a processed partition entirely on disk
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.PROCESSED,
+        StorageState.ON_DISK,
+        StorageState.ON_DISK,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+
     return null;
   }
 
@@ -536,9 +644,9 @@ public class MetaPartitionManager {
    */
   public void doneLoadingPartition(int partitionId, long superstep) {
     MetaPartition meta = partitions.get(partitionId);
-    numInMemoryPartitions.getAndIncrement();
     int owner = getOwnerThreadId(partitionId);
     synchronized (meta) {
+      PartitionStorageState stateBefore = meta.getPartitionStorageState();
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.setPartitionState(StorageState.IN_MEM);
       if (superstep == oocEngine.getSuperstep()) {
@@ -546,6 +654,8 @@ public class MetaPartitionManager {
       } else {
         meta.setIncomingMessagesState(StorageState.IN_MEM);
       }
+      PartitionStorageState stateAfter = meta.getPartitionStorageState();
+      updateCounters(stateBefore, stateAfter);
       // Check whether load was to prefetch a partition from disk to memory for
       // the next superstep
       if (meta.getProcessingState() == ProcessingState.PROCESSED) {
@@ -553,6 +663,7 @@ public class MetaPartitionManager {
       }
       perThreadPartitionDictionary.get(owner).addPartition(meta);
     }
+    updateGraphFractionInMemory();
   }
 
   /**
@@ -631,8 +742,16 @@ public class MetaPartitionManager {
           (meta.getPartitionState() == StorageState.IN_MEM ||
           meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
         perThreadPartitionDictionary.get(owner).removePartition(meta);
-        meta.setPartitionState(StorageState.IN_TRANSIT);
-        meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
+        // We may only need to offload either partition or current messages of
+        // that partition to disk. So, if either of the components (partition
+        // or its current messages) is already on disk, we should not update its
+        // metadata.
+        if (meta.getPartitionState() != StorageState.ON_DISK) {
+          meta.setPartitionState(StorageState.IN_TRANSIT);
+        }
+        if (meta.getCurrentMessagesState() != StorageState.ON_DISK) {
+          meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
+        }
         perThreadPartitionDictionary.get(owner).addPartition(meta);
         return true;
       } else {
@@ -648,16 +767,23 @@ public class MetaPartitionManager {
    * @param partitionId id of the partition that its data is offloaded
    */
   public void doneOffloadingPartition(int partitionId) {
-    numInMemoryPartitions.getAndDecrement();
-    updateGraphFractionInMemory();
     MetaPartition meta = partitions.get(partitionId);
     int owner = getOwnerThreadId(partitionId);
     synchronized (meta) {
+      // We either offload both partition and its messages to disk, or we only
+      // offload one of the components.
+      if (meta.getCurrentMessagesState() == StorageState.IN_TRANSIT &&
+          meta.getPartitionState() == StorageState.IN_TRANSIT) {
+        numInMemoryPartitions.getAndDecrement();
+      } else {
+        numPartiallyInMemoryPartitions.getAndDecrement();
+      }
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.setPartitionState(StorageState.ON_DISK);
       meta.setCurrentMessagesState(StorageState.ON_DISK);
       perThreadPartitionDictionary.get(owner).addPartition(meta);
     }
+    updateGraphFractionInMemory();
   }
 
   /**
@@ -675,8 +801,6 @@ public class MetaPartitionManager {
       dictionary.reset();
     }
     numPartitionsProcessed.set(0);
-    lowestGraphFractionInMemory.set((double) getNumInMemoryPartitions() /
-        getNumPartitions());
   }
 
   /**
@@ -687,11 +811,10 @@ public class MetaPartitionManager {
     for (MetaPartition meta : partitions.values()) {
       int owner = getOwnerThreadId(meta.getPartitionId());
       perThreadPartitionDictionary.get(owner).removePartition(meta);
+      PartitionStorageState stateBefore = meta.getPartitionStorageState();
       meta.resetMessages();
-      if (meta.getPartitionState() == StorageState.IN_MEM &&
-          meta.getCurrentMessagesState() == StorageState.ON_DISK) {
-        numInMemoryPartitions.getAndDecrement();
-      }
+      PartitionStorageState stateAfter = meta.getPartitionStorageState();
+      updateCounters(stateBefore, stateAfter);
       perThreadPartitionDictionary.get(owner).addPartition(meta);
     }
   }
@@ -863,6 +986,21 @@ public class MetaPartitionManager {
       currentMessagesState = incomingMessagesState;
       incomingMessagesState = StorageState.IN_MEM;
     }
+
+    /**
+     * @return the state of the partition and its current messages as a whole
+     */
+    public PartitionStorageState getPartitionStorageState() {
+      if (partitionState == StorageState.ON_DISK &&
+          currentMessagesState == StorageState.ON_DISK) {
+        return PartitionStorageState.FULLY_ON_DISK;
+      } else if (partitionState == StorageState.IN_MEM &&
+          currentMessagesState == StorageState.IN_MEM) {
+        return PartitionStorageState.FULLY_IN_MEM;
+      } else {
+        return PartitionStorageState.PARTIALLY_IN_MEM;
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
index 2e42906..8efa9de 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
@@ -168,8 +168,6 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
     LocalDiskDataInputWrapper(String fileName, byte[] buffer)
         throws IOException {
       file = new File(fileName);
-      LOG.info("LocalDiskDataInputWrapper: obtaining a data input from local " +
-          "file " + file.getAbsolutePath());
       if (LOG.isDebugEnabled()) {
         LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
             "local file " + file.getAbsolutePath());
@@ -216,8 +214,6 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
     LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
                                byte[] buffer) throws IOException {
       file = new File(fileName);
-      LOG.info("LocalDiskDataOutputWrapper: obtaining a data output from " +
-          "local file " + file.getAbsolutePath());
       if (LOG.isDebugEnabled()) {
         LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
             "local file " + file.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
index ffc5f7f..002dc85 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
@@ -29,6 +29,8 @@ import org.apache.log4j.Logger;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.base.Preconditions.checkState;
+
 /** Oracle for fixed out-of-core mechanism */
 public class FixedPartitionsOracle implements OutOfCoreOracle {
   /** Class logger */
@@ -63,11 +65,16 @@ public class FixedPartitionsOracle implements OutOfCoreOracle {
   public IOAction[] getNextIOActions() {
     int numPartitionsInMemory =
         oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getNextIOActions: calling with " + numPartitionsInMemory +
-          " partitions in memory, " + deltaNumPartitionsInMemory.get() +
-          " to be loaded");
+    int numPartialPartitionsInMemory =
+        oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory +
+          " partitions entirely in memory and " + numPartialPartitionsInMemory +
+          " partitions partially in memory, " +
+          deltaNumPartitionsInMemory.get() + " to be loaded");
     }
+    checkState(numPartitionsInMemory >= 0);
+    checkState(numPartialPartitionsInMemory >= 0);
     int numPartitions =
         numPartitionsInMemory + deltaNumPartitionsInMemory.get();
     // Fixed out-of-core policy:

http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index a7451bc..1e4593b 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -164,12 +164,12 @@ public class TestPartitionStores {
       serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
+    GraphTaskManager<IntWritable, IntWritable, NullWritable>
+        graphTaskManager = Mockito.mock(GraphTaskManager.class);
+    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
     ServerData<IntWritable, IntWritable, NullWritable>
         serverData = new ServerData<>(serviceWorker, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
-    GraphTaskManager<IntWritable, IntWritable, NullWritable>
-        graphTaskManager = new GraphTaskManager<>(context);
-    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
         partitionStore =
@@ -193,12 +193,12 @@ public class TestPartitionStores {
     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
+    GraphTaskManager<IntWritable, IntWritable, NullWritable>
+        graphTaskManager = Mockito.mock(GraphTaskManager.class);
+    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
     ServerData<IntWritable, IntWritable, NullWritable>
         serverData = new ServerData<>(serviceWorker, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
-    GraphTaskManager<IntWritable, IntWritable, NullWritable>
-        graphTaskManager = new GraphTaskManager<>(context);
-    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
         partitionStore =
@@ -311,12 +311,12 @@ public class TestPartitionStores {
 
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
+    GraphTaskManager<IntWritable, IntWritable, NullWritable>
+        graphTaskManager = Mockito.mock(GraphTaskManager.class);
+    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
     ServerData<IntWritable, IntWritable, NullWritable>
         serverData = new ServerData<>(serviceWorker, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
-    GraphTaskManager<IntWritable, IntWritable, NullWritable>
-        graphTaskManager = new GraphTaskManager<>(context);
-    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
         store =

http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
index 397605d..e497541 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
@@ -27,6 +27,9 @@ import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertex
 import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat;
 
 import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.ooc.OutOfCoreIOScheduler;
+import org.apache.giraph.ooc.persistence.InMemoryDataAccessor;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -38,24 +41,17 @@ import static org.junit.Assert.assertTrue;
  * Unit test for out-of-core mechanism
  */
 public class TestOutOfCore extends BspCase {
-  final static int NUM_PARTITIONS = 32;
-  final static int NUM_PARTITIONS_IN_MEMORY = 16;
+  private final static int NUM_PARTITIONS = 400;
+  private final static int NUM_PARTITIONS_IN_MEMORY = 8;
+  private GiraphConfiguration conf;
 
   public TestOutOfCore() {
       super(TestOutOfCore.class.getName());
   }
 
-  /**
-   * Run a job that tests the fixed out-of-core mechanism
-   *
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   */
-  @Test
-  public void testOutOfCore()
-          throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphConfiguration conf = new GiraphConfiguration();
+  @Before
+  public void prepareTest() {
+    conf = new GiraphConfiguration();
     conf.setComputationClass(SimplePageRankComputation.class);
     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
@@ -66,12 +62,37 @@ public class TestOutOfCore extends BspCase {
     GiraphConstants.METRICS_ENABLE.set(conf, true);
     GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS);
     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
-    NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
+    OutOfCoreIOScheduler.OOC_WAIT_INTERVAL.set(conf, 10);
     GiraphConstants.NUM_COMPUTE_THREADS.set(conf, 8);
     GiraphConstants.NUM_INPUT_THREADS.set(conf, 8);
     GiraphConstants.NUM_OUTPUT_THREADS.set(conf, 8);
+  }
+
+  @Test
+  public void testOutOfCoreInMemoryAccessor()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.set(conf, InMemoryDataAccessor.class);
+    GiraphConstants.NUM_OUT_OF_CORE_THREADS.set(conf, 8);
+    runTest();
+  }
+
+  @Test
+  public void testOutOfCoreLocalDiskAccessor()
+    throws IOException, InterruptedException, ClassNotFoundException {
     GiraphConstants.PARTITIONS_DIRECTORY.set(conf, "disk0,disk1,disk2");
+    runTest();
+  }
+
+  /**
+   * Run a job with fixed out-of-core policy and verify the result
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private void runTest()
+      throws IOException, InterruptedException, ClassNotFoundException {
     GiraphJob job = prepareJob(getCallingMethodName(), conf,
         getTempPath(getCallingMethodName()));
     // Overwrite the number of vertices set in BspCase


Mime
View raw message