giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject [2/2] git commit: updated refs/heads/trunk to 6f5a457
Date Thu, 30 Jul 2015 21:07:56 GMT
[GIRAPH-1023] Adding out-of-core messages to previously implemented adaptive out-of-core mechanism

Summary:
This is the continuation of the previous diff on out-of-core mechanism. This diff completes
the last diff by adding out-of-core messages, making the entire out-of-core mechanism a cohesive
entity in Giraph.

This diff also improves the API of PartitionStore by some minor refactoring.

Test Plan:
mvn clean verify
running pagerank and turning message combiner off on a large graph with limited memory does
not fail

Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D42897


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6f5a457f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6f5a457f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6f5a457f

Branch: refs/heads/trunk
Commit: 6f5a457fa93ac8b53cc77afd0eb8729f1e5665af
Parents: 4f3551d
Author: Hassan Eslami <heslami@fb.com>
Authored: Thu Jul 30 14:03:12 2015 -0700
Committer: Avery Ching <aching@fb.com>
Committed: Thu Jul 30 14:03:46 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   3 +
 .../java/org/apache/giraph/comm/ServerData.java |  91 +--
 .../ByteArrayMessagesPerVertexStore.java        |   5 +-
 .../giraph/comm/messages/MessageData.java       |  82 ++
 .../giraph/comm/messages/MessageStore.java      |   8 +
 .../comm/messages/SimpleMessageStore.java       |   5 +
 .../out_of_core/DiskBackedMessageStore.java     |   7 +
 .../primitives/IdByteArrayMessageStore.java     |   5 +
 .../primitives/IdOneMessagePerVertexStore.java  |   5 +
 .../primitives/IntByteArrayMessageStore.java    |   5 +
 .../primitives/IntFloatMessageStore.java        |   5 +
 .../primitives/LongDoubleMessageStore.java      |   5 +
 .../long_id/LongAbstractMessageStore.java       |   6 +-
 .../queue/AsyncMessageStoreWrapper.java         |   5 +
 .../NettyWorkerClientRequestProcessor.java      |   2 +-
 .../giraph/comm/netty/NettyWorkerServer.java    |  26 +-
 .../SendPartitionCurrentMessagesRequest.java    |   4 +-
 .../requests/SendWorkerMessagesRequest.java     |   4 +-
 .../SendWorkerOneMessageToManyRequest.java      |  13 +-
 .../apache/giraph/graph/ComputeCallable.java    |   1 +
 .../apache/giraph/graph/GraphTaskManager.java   |   3 +-
 .../giraph/ooc/AdaptiveOutOfCoreEngine.java     |  20 +-
 .../apache/giraph/ooc/CheckMemoryCallable.java  |  42 +-
 .../giraph/ooc/DiskBackedPartitionStore.java    | 750 ++++++++++++++-----
 .../giraph/ooc/OutOfCoreProcessorCallable.java  |  31 +-
 .../apache/giraph/partition/PartitionData.java  | 116 +++
 .../apache/giraph/partition/PartitionStore.java | 182 +++--
 .../giraph/partition/SimplePartitionStore.java  |  39 +-
 .../apache/giraph/worker/BspServiceWorker.java  |  10 +-
 .../apache/giraph/comm/RequestFailureTest.java  |   7 +-
 .../org/apache/giraph/comm/RequestTest.java     |  14 +-
 .../queue/AsyncMessageStoreWrapperTest.java     |   5 +
 .../giraph/partition/TestPartitionStores.java   |  59 +-
 .../java/org/apache/giraph/utils/MockUtils.java |   8 +-
 .../java/org/apache/giraph/TestOutOfCore.java   |  38 +-
 35 files changed, 1169 insertions(+), 442 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c844f61..c2306e9 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 1.2.0 - unreleased
 =======
+  GIRAPH-1023: Adding out-of-core messages to previously implemented adaptive out-of-core

+  mechanism (heslami via aching)
+
   GIRAPH-1022: Out-of-core mechanism for input superstep and graph data (heslami via aching)
 
   GIRAPH-1021: Missing progress report for graph mutations. (heslami via aching)

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index eddfbc6..b177446 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -31,9 +31,6 @@ import com.google.common.collect.Maps;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
@@ -65,19 +62,6 @@ public class ServerData<I extends WritableComparable,
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Partition store for this worker. */
   private volatile PartitionStore<I, V, E> partitionStore;
-  /** Message store factory */
-  private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
-  messageStoreFactory;
-  /**
-   * Message store for incoming messages (messages which will be consumed
-   * in the next super step)
-   */
-  private volatile MessageStore<I, Writable> incomingMessageStore;
-  /**
-   * Message store for current messages (messages which we received in
-   * previous super step and which will be consumed in current super step)
-   */
-  private volatile MessageStore<I, Writable> currentMessageStore;
   /**
    * Map of partition ids to vertex mutations from other workers. These are
    * mutations that should be applied before execution of *current* super step.
@@ -122,18 +106,14 @@ public class ServerData<I extends WritableComparable,
    *
    * @param service Service worker
    * @param conf Configuration
-   * @param messageStoreFactory Factory for message stores
    * @param context Mapper context
    */
   public ServerData(
       CentralizedServiceWorker<I, V, E> service,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
-          messageStoreFactory,
       Mapper<?, ?, ?, ?>.Context context) {
     this.serviceWorker = service;
     this.conf = conf;
-    this.messageStoreFactory = messageStoreFactory;
     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
       partitionStore =
           new DiskBackedPartitionStore<I, V, E>(conf, context,
@@ -157,78 +137,27 @@ public class ServerData<I extends WritableComparable,
   }
 
   /**
-   * Get message store for incoming messages (messages which will be consumed
-   * in the next super step)
-   *
-   * @param <M> Message data
-   * @return Incoming message store
-   */
-  public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
-    return (MessageStore<I, M>) incomingMessageStore;
-  }
-
-  /**
-   * Get message store for current messages (messages which we received in
-   * previous super step and which will be consumed in current super step)
-   *
-   * @param <M> Message data
-   * @return Current message store
-   */
-  public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
-    return (MessageStore<I, M>) currentMessageStore;
-  }
-
-  /**
    * Re-initialize message stores.
    * Discards old values if any.
    * @throws IOException
    */
   public void resetMessageStores() throws IOException {
-    if (currentMessageStore != null) {
-      currentMessageStore.clearAll();
-      currentMessageStore = null;
-    }
-    if (incomingMessageStore != null) {
-      incomingMessageStore.clearAll();
-      incomingMessageStore = null;
-    }
-    prepareSuperstep();
+    getPartitionStore().resetMessageStores();
+    currentWorkerToWorkerMessages =
+        Collections.synchronizedList(new ArrayList<Writable>());
+    incomingWorkerToWorkerMessages =
+        Collections.synchronizedList(new ArrayList<Writable>());
   }
 
   /** Prepare for next super step */
   public void prepareSuperstep() {
-    if (currentMessageStore != null) {
-      try {
-        currentMessageStore.clearAll();
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "Failed to clear previous message store");
-      }
-    }
-    currentMessageStore =
-        incomingMessageStore != null ? incomingMessageStore :
-            messageStoreFactory.newStore(conf.getIncomingMessageClasses());
-    incomingMessageStore =
-        messageStoreFactory.newStore(conf.getOutgoingMessageClasses());
-    // finalize current message-store before resolving mutations
-    currentMessageStore.finalizeStore();
-
+    partitionStore.prepareSuperstep();
     currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
     incomingWorkerToWorkerMessages =
         Collections.synchronizedList(new ArrayList<Writable>());
   }
 
   /**
-   * In case of async message store we have to wait for all messages
-   * to be processed before going into next superstep.
-   */
-  public void waitForComplete() {
-    if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
-      ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
-    }
-  }
-
-  /**
    * Get the vertex mutations (synchronize on the values)
    *
    * @return Vertex mutations
@@ -323,7 +252,8 @@ public class ServerData<I extends WritableComparable,
         VertexMutations<I, V, E> vertexMutations = entry.getValue();
         Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
             originalVertex, vertexMutations,
-            getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
+            getPartitionStore().getCurrentMessageStore()
+                .hasMessagesForVertex(entry.getKey()));
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("resolvePartitionMutations: Resolved vertex index " +
@@ -339,7 +269,8 @@ public class ServerData<I extends WritableComparable,
         } else if (originalVertex != null) {
           partition.removeVertex(vertexId);
           try {
-            getCurrentMessageStore().clearVertexMessages(vertexId);
+            getPartitionStore().getCurrentMessageStore()
+                .clearVertexMessages(vertexId);
           } catch (IOException e) {
             throw new IllegalStateException("resolvePartitionMutations: " +
                 "Caught IOException while clearing messages for a deleted " +
@@ -352,7 +283,7 @@ public class ServerData<I extends WritableComparable,
 
     // Keep track of vertices which are not here in the partition, but have
     // received messages
-    Iterable<I> destinations = getCurrentMessageStore().
+    Iterable<I> destinations = getPartitionStore().getCurrentMessageStore().
         getPartitionDestinationVertices(partitionId);
     if (!Iterables.isEmpty(destinations)) {
       for (I vertexId : destinations) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index dfb5683..29a0888 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -191,13 +191,16 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    * @param <I> Vertex id
    * @param <M> Message data
    */
-  private static class Factory<I extends WritableComparable, M extends Writable>
+  public static class Factory<I extends WritableComparable, M extends Writable>
     implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
     private CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 
+    /** Constructor for reflection */
+    public Factory() { }
+
     /**
      * @param service Worker service
      * @param config  Hadoop configuration

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
new file mode 100644
index 0000000..f974823
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Structure that keeps message information.
+ *
+ * @param <I> Vertex id
+ */
+public interface MessageData<I extends WritableComparable> {
+  /**
+   * Get message store for incoming messages (messages which will be consumed
+   * in the next super step)
+   *
+   * @param <M> Message data type
+   * @return Incoming message store
+   */
+  <M extends Writable> MessageStore<I, M> getIncomingMessageStore();
+
+  /**
+   * Get message store for current messages (messages which we received in
+   * previous super step and which will be consumed in current super step)
+   *
+   * @param <M> Message data type
+   * @return Current message store
+   */
+  <M extends Writable> MessageStore<I, M> getCurrentMessageStore();
+
+  /**
+   * Re-initialize message stores.
+   * Discards old values if any.
+
+   * @throws IOException
+   */
+  void resetMessageStores() throws IOException;
+
+  /**
+   * Adds messages for partition to current message store
+   *
+   * @param <M> Message data type
+   * @param partitionId Id of partition
+   * @param messages    Collection of vertex ids and messages we want to add
+   * @throws IOException
+   */
+  <M extends Writable> void addPartitionCurrentMessages(
+      int partitionId, VertexIdMessages<I, M> messages)
+      throws IOException;
+
+  /**
+   * Adds messages for partition to incoming message store
+   *
+   * @param <M> Message data type
+   * @param partitionId Id of partition
+   * @param messages    Collection of vertex ids and messages we want to add
+   * @throws IOException
+   */
+  <M extends Writable> void addPartitionIncomingMessages(
+      int partitionId, VertexIdMessages<I, M> messages)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 6f1179a..6e85ea3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -76,6 +76,14 @@ public interface MessageStore<I extends WritableComparable,
   boolean hasMessagesForVertex(I vertexId);
 
   /**
+   * Check if we have messages for some partition
+   *
+   * @param partitionId Id of partition which we want to check
+   * @return True iff we have messages for the given partition
+   */
+  boolean hasMessagesForPartition(int partitionId);
+
+  /**
    * Adds messages for partition
    *
    * @param partitionId Id of partition

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 37b8c05..a1d3625 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -211,6 +211,11 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return !map.get(partitionId).isEmpty();
+  }
+
+  @Override
   public void clearAll() throws IOException {
     map.clear();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 3351051..b28d15b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -34,6 +34,7 @@ import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
 /**
@@ -130,6 +131,12 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return !Iterables
+        .isEmpty(getMessageStore(partitionId).getDestinationVertices());
+  }
+
+  @Override
   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
     PartitionDiskBackedMessageStore<I, M> messageStore =
         partitionMessageStores.get(partitionId);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
index 0732079..e1e7a3f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
@@ -186,6 +186,11 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return map.get(partitionId).size() != 0;
+  }
+
+  @Override
   public Iterable<M> getVertexMessages(I vertexId) throws IOException {
     DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
     if (dataInputOutput == null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index a61536f..b172d24 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -162,6 +162,11 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return map.get(partitionId).size() != 0;
+  }
+
+  @Override
   public Iterable<M> getVertexMessages(
       I vertexId) throws IOException {
     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index a8c19be..2fbc35c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -177,6 +177,11 @@ public class IntByteArrayMessageStore<M extends Writable>
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return !map.get(partitionId).isEmpty();
+  }
+
+  @Override
   public Iterable<M> getVertexMessages(
       IntWritable vertexId) throws IOException {
     DataInputOutput dataInputOutput =

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 7a4ed09..3186224 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -138,6 +138,11 @@ public class IntFloatMessageStore
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return !map.get(partitionId).isEmpty();
+  }
+
+  @Override
   public Iterable<FloatWritable> getVertexMessages(
       IntWritable vertexId) throws IOException {
     Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 069face..6278c16 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -139,6 +139,11 @@ public class LongDoubleMessageStore
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return !map.get(partitionId).isEmpty();
+  }
+
+  @Override
   public Iterable<DoubleWritable> getVertexMessages(
       LongWritable vertexId) throws IOException {
     Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
index 50e8818..d8e5246 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
@@ -101,6 +101,11 @@ public abstract class LongAbstractMessageStore<M extends Writable,
T>
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return !map.get(partitionId).isEmpty();
+  }
+
+  @Override
   public void clearVertexMessages(LongWritable vertexId) throws IOException {
     getPartitionMap(vertexId).remove(vertexId.get());
   }
@@ -124,5 +129,4 @@ public abstract class LongAbstractMessageStore<M extends Writable,
T>
     }
     return vertices;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
index 252ee39..e820f26 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -142,6 +142,11 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
   }
 
   @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    return store.hasMessagesForPartition(partitionId);
+  }
+
+  @Override
   public void addPartitionMessages(
       int partitionId, VertexIdMessages<I, M> messages) throws IOException {
     int hash = partition2Queue.get(partitionId);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 1cd1bd6..e9b072a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -210,7 +210,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
                                      Partition<I, V, E> partition) {
     final int partitionId = partition.getId();
     MessageStore<I, Writable> messageStore =
-        serverData.getCurrentMessageStore();
+        serverData.getPartitionStore().getCurrentMessageStore();
     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
         new ByteArrayVertexIdMessages<I, Writable>(
             configuration.createOutgoingMessageValueFactory());

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 37fb246..600cb1a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -21,11 +21,8 @@ package org.apache.giraph.comm.netty;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -33,8 +30,6 @@ import org.apache.log4j.Logger;
 
 import java.net.InetSocketAddress;
 
-import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
-
 /**
  * Netty worker server that implement {@link WorkerServer} and contains
  * the actual {@link ServerData}.
@@ -78,8 +73,7 @@ public class NettyWorkerServer<I extends WritableComparable,
     this.context = context;
 
     serverData =
-        new ServerData<I, V, E>(service, conf, createMessageStoreFactory(),
-            context);
+        new ServerData<I, V, E>(service, conf, context);
 
     nettyServer = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
@@ -87,24 +81,6 @@ public class NettyWorkerServer<I extends WritableComparable,
     nettyServer.start();
   }
 
-  /**
-   * Decide which message store should be used for current application,
-   * and create the factory for that store
-   *
-   * @return Message store factory
-   */
-  private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
-  createMessageStoreFactory() {
-    Class<? extends MessageStoreFactory> messageStoreFactoryClass =
-        MESSAGE_STORE_FACTORY_CLASS.get(conf);
-
-    MessageStoreFactory messageStoreFactoryInstance =
-        ReflectionUtils.newInstance(messageStoreFactoryClass);
-    messageStoreFactoryInstance.initialize(service, conf);
-
-    return messageStoreFactoryInstance;
-  }
-
   @Override
   public InetSocketAddress getMyAddress() {
     return nettyServer.getMyAddress();

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
index b59d0cf..ab66aa3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
@@ -86,8 +86,8 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
   @Override
   public void doRequest(ServerData<I, V, E> serverData) {
     try {
-      serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId,
-          vertexIdMessageMap);
+      serverData.getPartitionStore()
+          .addPartitionCurrentMessages(partitionId, vertexIdMessageMap);
     } catch (IOException e) {
       throw new RuntimeException("doRequest: Got IOException ", e);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index 6953998..e9be327 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -71,8 +71,8 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
     while (iterator.hasNext()) {
       iterator.next();
       try {
-        serverData.getIncomingMessageStore().
-            addPartitionMessages(iterator.getCurrentFirst(),
+        serverData.getPartitionStore().
+            addPartitionIncomingMessages(iterator.getCurrentFirst(),
                 iterator.getCurrentSecond());
       } catch (IOException e) {
         throw new RuntimeException("doRequest: Got IOException ", e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
index f8d0473..aeb1b1d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
@@ -27,7 +27,6 @@ import java.util.Map.Entry;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
@@ -94,10 +93,11 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
   @Override
   public void doRequest(ServerData serverData) {
     try {
-      MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
-      if (messageStore.isPointerListEncoding()) {
+      if (serverData.getPartitionStore().getIncomingMessageStore()
+          .isPointerListEncoding()) {
         // if message store is pointer list based then send data as is
-        messageStore.addPartitionMessages(-1, oneMessageToManyIds);
+        serverData.getPartitionStore()
+            .addPartitionIncomingMessages(-1, oneMessageToManyIds);
       } else { // else split the data per partition and send individually
         CentralizedServiceWorker<I, ?, ?> serviceWorker =
             serverData.getServiceWorker();
@@ -144,8 +144,9 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
         for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
             partitionIdMsgs.entrySet()) {
           if (!idMsgs.getValue().isEmpty()) {
-            serverData.getIncomingMessageStore().addPartitionMessages(
-                idMsgs.getKey(), idMsgs.getValue());
+            serverData.getPartitionStore()
+                .addPartitionIncomingMessages(idMsgs.getKey(),
+                    idMsgs.getValue());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 923e427..fa268da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -103,6 +103,7 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
 
   /**
    * Constructor
+   *
    * @param context Context
    * @param graphState Current graph state (use to create own graph state)
    * @param messageStore Message store

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 0844858..3c09957 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -332,7 +332,8 @@ end[PURE_YARN]*/
       prepareForSuperstep(graphState);
       context.progress();
       MessageStore<I, Writable> messageStore =
-        serviceWorker.getServerData().getCurrentMessageStore();
+          serviceWorker.getServerData().getPartitionStore()
+              .getCurrentMessageStore();
       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
       int numThreads = Math.min(numComputeThreads, numPartitions);
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
index 8d3cab6..749e41e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
@@ -79,15 +79,21 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
   // ---- OOC Commands ----
   /**
    * List of partitions that are on disk, and their loaded *vertices* during
-   * INPUT_SUPERSTEP are ready to flush to disk
+   * INPUT_SUPERSTEP are ready to be flushed to disk
    */
   private final BlockingQueue<Integer> partitionsWithInputVertices;
   /**
    * List of partitions that are on disk, and their loaded *edges* during
-   * INPUT_SUPERSTEP are ready to flush to disk
+   * INPUT_SUPERSTEP are ready to be flushed to disk
    */
   private final BlockingQueue<Integer> partitionsWithInputEdges;
-  /** Number of partitions to be written to the disk */
+  /**
+   * List of partitions that are on disk, and their message buffers (either
+   * messages for current superstep, or incoming messages for next superstep)
+   * are ready to be flushed to disk
+   */
+  private final BlockingQueue<Integer> partitionsWithPendingMessages;
+  /** Number of partitions to be written to disk */
   private final AtomicInteger numPartitionsToSpill;
 
   /** Executor service for check memory thread */
@@ -128,6 +134,7 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
     this.done = false;
     this.partitionsWithInputVertices = new ArrayBlockingQueue<Integer>(100);
     this.partitionsWithInputEdges = new ArrayBlockingQueue<Integer>(100);
+    this.partitionsWithPendingMessages = new ArrayBlockingQueue<Integer>(100);
     this.numPartitionsToSpill = new AtomicInteger(0);
   }
 
@@ -217,6 +224,13 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
   }
 
   /**
+   * @return list of partitions that have large enough message buffers.
+   */
+  public BlockingQueue<Integer> getPartitionsWithPendingMessages() {
+    return partitionsWithPendingMessages;
+  }
+
+  /**
    * @return number of partitions to spill to disk
    */
   public AtomicInteger getNumPartitionsToSpill() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
index 7f52490..2b2c990 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
@@ -232,7 +232,6 @@ public class CheckMemoryCallable<I extends WritableComparable,
                   freeMemory));
         }
       }
-
       // If we have enough memory, we roll back the latest shrink in number of
       // partition slots.
       // If we do not have enough memory, but we are not in a bad scenario
@@ -265,9 +264,10 @@ public class CheckMemoryCallable<I extends WritableComparable,
             oocEngine.getPartitionsWithInputVertices();
         BlockingQueue<Integer> partitionsWithInputEdges =
             oocEngine.getPartitionsWithInputEdges();
+        BlockingQueue<Integer> partitionsWithPendingMessages =
+            oocEngine.getPartitionsWithPendingMessages();
         AtomicInteger numPartitionsToSpill =
             oocEngine.getNumPartitionsToSpill();
-
         while (freeMemory < midFreeMemoryFraction * maxMemory) {
           // Offload input vertex buffer of OOC partitions if we are in
           // INPUT_SUPERSTEP
@@ -276,7 +276,7 @@ public class CheckMemoryCallable<I extends WritableComparable,
             // vertex buffers of that partition).
             PairList<Integer, Integer> pairs =
                 partitionStore.getOocPartitionIdsWithPendingInputVertices();
-            freeMemory -= createCommand(pairs, partitionsWithInputVertices);
+            freeMemory -= createCommands(pairs, partitionsWithInputVertices);
           }
 
           // Offload edge store of OOC partitions if we are in INPUT_SUPERSTEP
@@ -284,7 +284,15 @@ public class CheckMemoryCallable<I extends WritableComparable,
               serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) {
             PairList<Integer, Integer> pairs =
                 partitionStore.getOocPartitionIdsWithPendingInputEdges();
-            freeMemory -= createCommand(pairs, partitionsWithInputEdges);
+            freeMemory -= createCommands(pairs, partitionsWithInputEdges);
+          }
+
+          // Offload message buffers of OOC partitions if we are still low in
+          // free memory
+          if (freeMemory < midFreeMemoryFraction * maxMemory) {
+            PairList<Integer, Integer> pairs =
+                partitionStore.getOocPartitionIdsWithPendingMessages();
+            freeMemory -= createCommands(pairs, partitionsWithPendingMessages);
           }
 
           // Offload partitions if we are still low in free memory
@@ -295,14 +303,16 @@ public class CheckMemoryCallable<I extends WritableComparable,
 
           if (!partitionsWithInputVertices.isEmpty() ||
               !partitionsWithInputEdges.isEmpty() ||
+              !partitionsWithPendingMessages.isEmpty() ||
               numPartitionsToSpill.get() != 0) {
             if (LOG.isInfoEnabled()) {
               LOG.info("call: signal out-of-core processor threads to start " +
-                  "offloading. These threads will spill vertex buffer of " +
+                  "offloading. These threads will spill vertex buffers of " +
                   partitionsWithInputVertices.size() + " partitions, edge " +
                   "buffers of " + partitionsWithInputEdges.size() +
-                  " partitions, and " + numPartitionsToSpill.get() + " whole " +
-                  "partition");
+                  " partitions, pending message buffers of " +
+                  partitionsWithPendingMessages.size() + " partitions, and " +
+                  numPartitionsToSpill.get() + " whole partitions");
             }
             // Opening the gate for OOC processing threads to start spilling
             // data on disk
@@ -356,7 +366,7 @@ public class CheckMemoryCallable<I extends WritableComparable,
           if (LOG.isInfoEnabled()) {
             LOG.info("call: " +
                 (gcDone ?
-                    ("GC is done. " + String.format("GC time = %.2f sec.",
+                    ("GC is done. " + String.format("GC time = %.2f sec. ",
                         (System.currentTimeMillis() - gcStartTime) / 1000.0)) :
                     "") +
                 "Finished offloading data to disk. " +
@@ -432,18 +442,20 @@ public class CheckMemoryCallable<I extends WritableComparable,
   }
 
   /**
-   * Generates the command for a particular type of data we want to offload to
-   * disk.
+   * Generate commands for out-of-core processor threads based on the
+   * (partitionId, memory foot-print) pairs we have on a particular type of data
+   * (either vertex buffer, edge buffer, or message buffer).
    *
-   * @param pairs list of pair(partitionId, approximate foot-print that is going
-   *              of be reduced by offloading the particular data of a
+   * @param pairs list of pairs (partitionId, estimated memory foot-print that
+   *              is going to be reduced by offloading the particular data of a
    *              partition)
-   * @param commands list of partitionIds for which we want to execute the
-   *                 command
+   * @param commands commands to generate for out-of-core processor threads. a
+   *                 command is a partition id, for which the appropriate data
+   *                 should be flushed to disk.
    * @return approximate amount of memory (in MB) that is going to be freed up
    *         after executing the generated commands
    */
-  private double createCommand(PairList<Integer, Integer> pairs,
+  private double createCommands(PairList<Integer, Integer> pairs,
       BlockingQueue<Integer> commands) {
     double usedMemory = 0;
     if (pairs.getSize() != 0) {


Mime
View raw message