giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1369508 [1/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/comm/messages/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/graph/par...
Date Sun, 05 Aug 2012 00:17:14 GMT
Author: aching
Date: Sun Aug  5 00:17:12 2012
New Revision: 1369508

URL: http://svn.apache.org/viewvc?rev=1369508&view=rev
Log:
GIRAPH-45: Improve the way to keep outgoing messages (majakabiljo via
aching).


Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendPartitionCurrentMessagesRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
    giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sun Aug  5 00:17:12 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+
+  GIRAPH-45: Improve the way to keep outgoing messages (majakabiljo
+  via aching).
  
   GIRAPH-271: Regression in imports in CommunicationsInterface (netj
   via aching).

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Sun Aug  5 00:17:12 2012
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.giraph.comm.ServerData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -169,4 +171,11 @@ public interface CentralizedServiceWorke
    * Operations that will be called if there is a failure by a worker.
    */
   void failureCleanup();
+
+  /**
+   * Get server data
+   *
+   * @return Server data
+   */
+  ServerData<I, V, E, M> getServerData();
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sun Aug  5 00:17:12 2012
@@ -1238,11 +1238,16 @@ public abstract class BasicRPCCommunicat
       }
       VertexMutations<I, V, E, M> vertexMutations =
           inVertexMutationsMap.get(vertexIndex);
+      boolean receivedMessages =
+          messages != null && !Iterables.isEmpty(messages);
       Vertex<I, V, E, M> vertex =
           vertexResolver.resolve(vertexIndex,
               originalVertex,
               vertexMutations,
-              messages);
+              receivedMessages);
+      if (vertex != null && receivedMessages) {
+        service.assignMessagesToVertex(vertex, messages);
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("prepareSuperstep: Resolved vertex index " +
             vertexIndex + " with original vertex " +

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Sun Aug  5 00:17:12 2012
@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadFactor
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -97,6 +98,8 @@ public class NettyServer<I extends Writa
         new SendPartitionMessagesRequest<I, V, E, M>());
     requestRegistry.registerClass(
         new SendPartitionMutationsRequest<I, V, E, M>());
+    requestRegistry.registerClass(
+        new SendPartitionCurrentMessagesRequest<I, V, E, M>());
     requestRegistry.shutdown();
 
     ThreadFactory bossFactory = new ThreadFactoryBuilder()

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java Sun Aug  5 00:17:12 2012
@@ -19,6 +19,8 @@
 package org.apache.giraph.comm;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
@@ -33,6 +35,7 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -202,10 +205,40 @@ public class NettyWorkerClient<I extends
           " from " + workerInfo + ", with partition " + partition);
     }
 
-    WritableRequest<I, V, E, M> writableReauest =
-        new SendVertexRequest<I, V, E, M>(
-            partition.getId(), partition.getVertices());
-    nettyClient.sendWritableRequest(remoteServerAddress, writableReauest);
+    int partitionId = partition.getId();
+    WritableRequest<I, V, E, M> vertexRequest =
+        new SendVertexRequest<I, V, E, M>(partitionId,
+            partition.getVertices());
+    nettyClient.sendWritableRequest(remoteServerAddress, vertexRequest);
+
+    // messages are stored separately
+    MessageStoreByPartition<I, M> messageStore =
+        service.getServerData().getCurrentMessageStore();
+    Map<I, Collection<M>> map = Maps.newHashMap();
+    int messagesInMap = 0;
+    for (I vertexId :
+        messageStore.getPartitionDestinationVertices(partitionId)) {
+      try {
+        Collection<M> messages = messageStore.getVertexMessages(vertexId);
+        map.put(vertexId, messages);
+        messagesInMap += messages.size();
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "sendPartitionReq: Got IOException ", e);
+      }
+      if (messagesInMap > maxMessagesPerPartition) {
+        WritableRequest<I, V, E, M> messagesRequest = new
+            SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
+        nettyClient.sendWritableRequest(remoteServerAddress, messagesRequest);
+        map.clear();
+        messagesInMap = 0;
+      }
+    }
+    if (!map.isEmpty()) {
+      WritableRequest<I, V, E, M> messagesRequest = new
+          SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
+      nettyClient.sendWritableRequest(remoteServerAddress, messagesRequest);
+    }
   }
 
   /**

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java Sun Aug  5 00:17:12 2012
@@ -131,6 +131,11 @@ public class NettyWorkerClientServer<I e
   }
 
   @Override
+  public ServerData<I, V, E, M> getServerData() {
+    return server.getServerData();
+  }
+
+  @Override
   public void close() {
     server.close();
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java Sun Aug  5 00:17:12 2012
@@ -19,7 +19,16 @@
 package org.apache.giraph.comm;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.BasicMessageStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
+import org.apache.giraph.comm.messages.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
@@ -29,12 +38,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
 /**
@@ -72,7 +79,27 @@ public class NettyWorkerServer<I extends
       CentralizedServiceWorker<I, V, E, M> service) {
     this.conf = conf;
     this.service = service;
-    serverData = new ServerData<I, V, E, M>(conf);
+
+    boolean useOutOfCoreMessaging = conf.getBoolean(
+        GiraphJob.USE_OUT_OF_CORE_MESSAGES,
+        GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
+    if (!useOutOfCoreMessaging) {
+      serverData = new ServerData<I, V, E, M>(
+          SimpleMessageStore.newFactory(service, conf));
+    } else {
+      int maxMessagesInMemory = conf.getInt(GiraphJob.MAX_MESSAGES_IN_MEMORY,
+          GiraphJob.MAX_MESSAGES_IN_MEMORY_DEFAULT);
+      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory =
+          SequentialFileMessageStore.newFactory(conf);
+      MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
+          partitionStoreFactory =
+          DiskBackedMessageStore.newFactory(conf, fileStoreFactory);
+      MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+          storeFactory = DiskBackedMessageStoreByPartition.newFactory(service,
+              maxMessagesInMemory, partitionStoreFactory);
+      serverData = new ServerData<I, V, E, M>(storeFactory);
+    }
+
     nettyServer = new NettyServer<I, V, E, M>(conf, serverData);
     nettyServer.start();
   }
@@ -84,30 +111,22 @@ public class NettyWorkerServer<I extends
 
   @Override
   public void prepareSuperstep() {
-    // Assign the in messages to the vertices and keep track of the messages
-    // that have no vertex here to be resolved later.  Ensure messages are
-    // being sent to the right worker.
+    serverData.prepareSuperstep();
+
     Set<I> resolveVertexIndexSet = Sets.newHashSet();
-    for (Entry<I, Collection<M>> entry :
-        serverData.getTransientMessages().entrySet()) {
-      synchronized (entry.getValue()) {
-        if (entry.getValue().isEmpty()) {
-          continue;
+    // Keep track of the vertices which are not here but have received messages
+    for (I vertexId :
+        serverData.getCurrentMessageStore().getDestinationVertices()) {
+      Vertex<I, V, E, M> vertex = service.getVertex(vertexId);
+      if (vertex == null) {
+        if (service.getPartition(vertexId) == null) {
+          throw new IllegalStateException(
+              "prepareSuperstep: No partition for vertex index " + vertexId);
         }
-        Vertex<I, V, E, M> vertex = service.getVertex(entry.getKey());
-        if (vertex == null) {
-          if (service.getPartition(entry.getKey()) == null) {
-            throw new IllegalStateException("prepareSuperstep: No partition " +
-                "for vertex index " + entry.getKey());
-          }
-          if (!resolveVertexIndexSet.add(entry.getKey())) {
-            throw new IllegalStateException(
-                "prepareSuperstep: Already has missing vertex on this " +
-                    "worker for " + entry.getKey());
-          }
-        } else {
-          service.assignMessagesToVertex(vertex, entry.getValue());
-          entry.getValue().clear();
+        if (!resolveVertexIndexSet.add(vertexId)) {
+          throw new IllegalStateException(
+              "prepareSuperstep: Already has missing vertex on this " +
+                  "worker for " + vertexId);
         }
       }
     }
@@ -128,19 +147,6 @@ public class NettyWorkerServer<I extends
               conf, service.getGraphMapper().getGraphState());
       Vertex<I, V, E, M> originalVertex =
           service.getVertex(vertexIndex);
-      Iterable<M> messages = null;
-      if (originalVertex != null) {
-        messages = originalVertex.getMessages();
-      } else {
-        Collection<M> transientMessages =
-            serverData.getTransientMessages().get(vertexIndex);
-        if (transientMessages != null) {
-          synchronized (transientMessages) {
-            messages = Lists.newArrayList(transientMessages);
-          }
-          serverData.getTransientMessages().remove(vertexIndex);
-        }
-      }
 
       VertexMutations<I, V, E, M> mutations = null;
       VertexMutations<I, V, E, M> vertexMutations =
@@ -152,7 +158,9 @@ public class NettyWorkerServer<I extends
         serverData.getVertexMutations().remove(vertexIndex);
       }
       Vertex<I, V, E, M> vertex = vertexResolver.resolve(
-          vertexIndex, originalVertex, mutations, messages);
+          vertexIndex, originalVertex, mutations,
+          serverData.getCurrentMessageStore().
+              hasMessagesForVertex(vertexIndex));
       if (LOG.isDebugEnabled()) {
         LOG.debug("prepareSuperstep: Resolved vertex index " +
             vertexIndex + " with original vertex " +
@@ -177,8 +185,6 @@ public class NettyWorkerServer<I extends
       }
     }
 
-    serverData.getTransientMessages().clear();
-
     if (!serverData.getVertexMutations().isEmpty()) {
       throw new IllegalStateException("prepareSuperstep: Illegally " +
           "still has " + serverData.getVertexMutations().size() +
@@ -193,6 +199,11 @@ public class NettyWorkerServer<I extends
   }
 
   @Override
+  public ServerData<I, V, E, M> getServerData() {
+    return serverData;
+  }
+
+  @Override
   public void close() {
     nettyServer.stop();
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Sun Aug  5 00:17:12 2012
@@ -81,7 +81,6 @@ public class RPCCommunications<I extends
    * @param service Server worker.
    * @param graphState Graph state from infrastructure.
    * @throws IOException
-   * @throws UnknownHostException
    * @throws InterruptedException
    */
   public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
@@ -203,4 +202,10 @@ public class RPCCommunications<I extends
       });
     /*end[HADOOP_NON_SASL_RPC]*/
   }
+
+  @Override
+  public ServerData<I, V, E, M> getServerData() {
+    throw new IllegalStateException(
+        "getServerData: Tried to get ServerData while using RPC");
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java Sun Aug  5 00:17:12 2012
@@ -38,10 +38,17 @@ public class RequestRegistry {
   public enum Type {
     /** Sending vertices request */
     SEND_VERTEX_REQUEST,
-    /** Sending a partition of messages */
+    /** Sending a partition of messages for next superstep */
     SEND_PARTITION_MESSAGES_REQUEST,
+    /**
+     * Sending a partition of messages for current superstep
+     * (used during partition exchange)
+     */
+    SEND_PARTITION_CURRENT_MESSAGES_REQUEST,
     /** Send a partition of mutations */
     SEND_PARTITION_MUTATIONS_REQUEST,
+    /** Sending messages request */
+    SEND_MESSAGES_REQUEST,
   }
 
   /**

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java Sun Aug  5 00:17:12 2012
@@ -35,7 +35,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Send a collection of vertex messages for a partition.
@@ -118,32 +117,11 @@ public class SendPartitionMessagesReques
 
   @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
-    ConcurrentHashMap<I, Collection<M>> transientMessages =
-      serverData.getTransientMessages();
-    for (Entry<I, Collection<M>> entry : vertexIdMessages.entrySet()) {
-      Collection<M> messages = transientMessages.get(entry.getKey());
-      if (messages == null) {
-        final Collection<M> tmpMessages =
-            Lists.newArrayListWithCapacity(entry.getValue().size());
-        messages = transientMessages.putIfAbsent(entry.getKey(), tmpMessages);
-        if (messages == null) {
-          messages = tmpMessages;
-        }
-      }
-      synchronized (messages) {
-        messages.addAll(entry.getValue());
-        if (serverData.getCombiner() != null) {
-          try {
-            messages = Lists.newArrayList(
-                serverData.getCombiner().combine(entry.getKey(), messages));
-          } catch (IOException e) {
-            throw new IllegalStateException(
-                "doRequest: Combiner failed to combine messages " + messages,
-                e);
-          }
-          transientMessages.put(entry.getKey(), messages);
-        }
-      }
+    try {
+      serverData.getIncomingMessageStore().addPartitionMessages(
+          vertexIdMessages, partitionId);
+    } catch (IOException e) {
+      throw new RuntimeException("doRequest: Got IOException ", e);
     }
   }
 
@@ -156,4 +134,22 @@ public class SendPartitionMessagesReques
   public void setConf(Configuration conf) {
     this.conf = conf;
   }
+
+  /**
+   * Get id of partition
+   *
+   * @return Partition id
+   */
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * Get messages
+   *
+   * @return Messages map
+   */
+  public Map<I, Collection<M>> getVertexIdMessages() {
+    return vertexIdMessages;
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Sun Aug  5 00:17:12 2012
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.comm;
 
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexMutations;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -40,8 +40,6 @@ import java.util.concurrent.ConcurrentHa
 @SuppressWarnings("rawtypes")
 public class ServerData<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
-  /** Combiner instance, can be null */
-  private VertexCombiner<I, M> combiner;
   /**
    * Map of partition ids to incoming vertices from other workers.
    * (Synchronized on values)
@@ -49,15 +47,20 @@ public class ServerData<I extends Writab
   private final ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>
   inPartitionVertexMap =
       new ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>();
+
+  /** Message store factory */
+  private final
+  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> messageStoreFactory;
+  /**
+   * Message store for incoming messages (messages which will be consumed
+   * in the next super step)
+   */
+  private volatile MessageStoreByPartition<I, M> incomingMessageStore;
   /**
-   * Map of inbound messages, mapping from vertex index to list of messages.
-   * Transferred to inMessages at beginning of a superstep.  This
-   * intermediary step exists so that the combiner will run not only at the
-   * client, but also at the server. Also, allows the sending of large
-   * message lists during the superstep computation. (Synchronized on values)
+   * Message store for current messages (messages which we received in
+   * previous super step and which will be consumed in current super step)
    */
-  private final ConcurrentHashMap<I, Collection<M>> transientMessages =
-      new ConcurrentHashMap<I, Collection<M>>();
+  private volatile MessageStoreByPartition<I, M> currentMessageStore;
   /**
    * Map of partition ids to incoming vertex mutations from other workers.
    * (Synchronized access to values)
@@ -65,16 +68,13 @@ public class ServerData<I extends Writab
   private final ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
   vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
 
-  /**
-   * Constructor.
-   * @param conf Configuration (used to instantiate the combiner).
-   */
-  public ServerData(Configuration conf) {
-    if (BspUtils.getVertexCombinerClass(conf) == null) {
-      combiner = null;
-    } else {
-      combiner = BspUtils.createVertexCombiner(conf);
-    }
+  /** @param messageStoreFactory Factory for message stores */
+  public ServerData(MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+      messageStoreFactory) {
+
+    this.messageStoreFactory = messageStoreFactory;
+    currentMessageStore = messageStoreFactory.newStore();
+    incomingMessageStore = messageStoreFactory.newStore();
   }
 
   /**
@@ -88,12 +88,37 @@ public class ServerData<I extends Writab
   }
 
   /**
-   * Get the vertex messages (synchronize on the values)
+   * Get message store for incoming messages (messages which will be consumed
+   * in the next super step)
+   *
+   * @return Incoming message store
+   */
+  public MessageStoreByPartition<I, M> getIncomingMessageStore() {
+    return incomingMessageStore;
+  }
+
+  /**
+   * Get message store for current messages (messages which we received in
+   * previous super step and which will be consumed in current super step)
    *
-   * @return Vertex messages
+   * @return Current message store
    */
-  public ConcurrentHashMap<I, Collection<M>> getTransientMessages() {
-    return transientMessages;
+  public MessageStoreByPartition<I, M> getCurrentMessageStore() {
+    return currentMessageStore;
+  }
+
+  /** Prepare for next super step */
+  public void prepareSuperstep() {
+    if (currentMessageStore != null) {
+      try {
+        currentMessageStore.clearAll();
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "Failed to clear previous message store");
+      }
+    }
+    currentMessageStore = incomingMessageStore;
+    incomingMessageStore = messageStoreFactory.newStore();
   }
 
   /**
@@ -105,12 +130,4 @@ public class ServerData<I extends Writab
   getVertexMutations() {
     return vertexMutations;
   }
-
-  /**
-   * Get the combiner instance.
-   * @return The combiner.
-   */
-  public VertexCombiner<I, M> getCombiner() {
-    return combiner;
-  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java Sun Aug  5 00:17:12 2012
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.comm;
 
-import java.io.Closeable;
-import java.util.Collection;
-import java.util.Map;
-
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+
 /**
  * Interface for message communication server.
  *
@@ -61,6 +61,13 @@ public interface WorkerServer<I extends 
   Map<Integer, Collection<Vertex<I, V, E, M>>> getInPartitionVertexMap();
 
   /**
+   * Get server data
+   *
+   * @return Server data
+   */
+  ServerData<I, V, E, M> getServerData();
+
+  /**
    * Shuts down.
    */
   void close();

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Most basic message store with just add, get and clear operations
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public interface BasicMessageStore<I extends WritableComparable,
+    M extends Writable> extends Writable {
+  /**
+   * Adds messages
+   *
+   * @param messages Map of messages we want to add
+   * @throws IOException
+   */
+  void addMessages(Map<I, Collection<M>> messages) throws IOException;
+
+  /**
+   * Gets messages for a vertex.
+   *
+   * @param vertexId Vertex id for which we want to get messages
+   * @return Messages for vertex with required id
+   * @throws IOException
+   */
+  Collection<M> getVertexMessages(I vertexId) throws IOException;
+
+  /**
+   * Clears messages for a vertex.
+   *
+   * @param vertexId Vertex id for which we want to clear messages
+   * @throws IOException
+   */
+  void clearVertexMessages(I vertexId) throws IOException;
+
+  /**
+   * Clears all resources used by this store.
+   *
+   * @throws IOException
+   */
+  void clearAll() throws IOException;
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.utils.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Message storage with in memory map of messages and with support for
+ * flushing all the messages to the disk.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class DiskBackedMessageStore<I extends WritableComparable,
+    M extends Writable> implements FlushableMessageStore<I, M> {
+  /** In memory message map */
+  private volatile ConcurrentNavigableMap<I, Collection<M>> inMemoryMessages;
+  /** Hadoop configuration */
+  private final Configuration config;
+  /** Combiner for messages */
+  private final VertexCombiner<I, M> combiner;
+  /** Counter for number of messages in memory */
+  private final AtomicInteger numberOfMessagesInMemory;
+  /** To keep vertex ids which we have messages for */
+  private final Set<I> destinationVertices;
+  /** File stores in which we keep flushed messages */
+  private final Collection<BasicMessageStore<I, M>> fileStores;
+  /** Factory for creating file stores when flushing */
+  private final
+  MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
+  /** Lock for disk flushing */
+  private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
+  /**
+   * @param combiner         Combiner for messages
+   * @param config           Hadoop configuration
+   * @param fileStoreFactory Factory for creating file stores when flushing
+   */
+  public DiskBackedMessageStore(VertexCombiner<I, M> combiner,
+      Configuration config,
+      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
+    inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
+    this.config = config;
+    this.combiner = combiner;
+    numberOfMessagesInMemory = new AtomicInteger(0);
+    destinationVertices =
+        Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
+    fileStores = Lists.newArrayList();
+    this.fileStoreFactory = fileStoreFactory;
+  }
+
+  @Override
+  public void addVertexMessages(I vertexId,
+      Collection<M> messages) throws IOException {
+    destinationVertices.add(vertexId);
+
+    rwLock.readLock().lock();
+    try {
+      Collection<M> currentMessages =
+          CollectionUtils.addConcurrent(vertexId, messages, inMemoryMessages);
+      if (combiner != null) {
+        synchronized (currentMessages) {
+          numberOfMessagesInMemory.addAndGet(
+              messages.size() - currentMessages.size());
+          currentMessages =
+              Lists.newArrayList(combiner.combine(vertexId, currentMessages));
+          inMemoryMessages.put(vertexId, currentMessages);
+          numberOfMessagesInMemory.addAndGet(currentMessages.size());
+        }
+      } else {
+        numberOfMessagesInMemory.addAndGet(messages.size());
+      }
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
+    for (Entry<I, Collection<M>> entry : messages.entrySet()) {
+      addVertexMessages(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public Collection<M> getVertexMessages(I vertexId) throws IOException {
+    Collection<M> messages = inMemoryMessages.get(vertexId);
+    if (messages == null) {
+      messages = Lists.newArrayList();
+    }
+    for (BasicMessageStore<I, M> fileStore : fileStores) {
+      messages.addAll(fileStore.getVertexMessages(vertexId));
+    }
+    return messages;
+  }
+
+  @Override
+  public int getNumberOfMessages() {
+    return numberOfMessagesInMemory.get();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    return destinationVertices.contains(vertexId);
+  }
+
+  @Override
+  public Iterable<I> getDestinationVertices() {
+    return destinationVertices;
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    inMemoryMessages.remove(vertexId);
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    inMemoryMessages.clear();
+    destinationVertices.clear();
+    for (BasicMessageStore<I, M> fileStore : fileStores) {
+      fileStore.clearAll();
+    }
+    fileStores.clear();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    ConcurrentNavigableMap<I, Collection<M>> messagesToFlush = null;
+    rwLock.writeLock().lock();
+    try {
+      messagesToFlush = inMemoryMessages;
+      inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
+      numberOfMessagesInMemory.set(0);
+    } finally {
+      rwLock.writeLock().unlock();
+    }
+    BasicMessageStore<I, M> fileStore = fileStoreFactory.newStore();
+    fileStore.addMessages(messagesToFlush);
+    synchronized (fileStores) {
+      fileStores.add(fileStore);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write destination vertices
+    out.writeInt(destinationVertices.size());
+    for (I vertexId : destinationVertices) {
+      vertexId.write(out);
+    }
+
+    // write in memory messages map
+    out.writeInt(inMemoryMessages.size());
+    for (Entry<I, Collection<M>> entry : inMemoryMessages.entrySet()) {
+      entry.getKey().write(out);
+      out.writeInt(entry.getValue().size());
+      for (M message : entry.getValue()) {
+        message.write(out);
+      }
+    }
+
+    // write file stores
+    out.writeInt(fileStores.size());
+    for (BasicMessageStore<I, M> fileStore : fileStores) {
+      fileStore.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // read destination vertices
+    int numVertices = in.readInt();
+    for (int v = 0; v < numVertices; v++) {
+      I vertexId = BspUtils.<I>createVertexId(config);
+      vertexId.readFields(in);
+      destinationVertices.add(vertexId);
+    }
+
+    // read in memory map
+    int mapSize = in.readInt();
+    for (int m = 0; m < mapSize; m++) {
+      I vertexId = BspUtils.<I>createVertexId(config);
+      vertexId.readFields(in);
+      int numMessages = in.readInt();
+      numberOfMessagesInMemory.addAndGet(numMessages);
+      List<M> messages = Lists.newArrayList();
+      for (int i = 0; i < numMessages; i++) {
+        M message = BspUtils.<M>createMessageValue(config);
+        message.readFields(in);
+        messages.add(message);
+      }
+      inMemoryMessages.put(vertexId, messages);
+    }
+
+    // read file stores
+    int numFileStores = in.readInt();
+    for (int s = 0; s < numFileStores; s++) {
+      BasicMessageStore<I, M> fileStore = fileStoreFactory.newStore();
+      fileStore.readFields(in);
+      fileStores.add(fileStore);
+    }
+  }
+
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param config           Hadoop configuration
+   * @param fileStoreFactory Factory for creating message stores for
+   *                         partitions
+   * @param <I>              Vertex id
+   * @param <M>              Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
+      Configuration config,
+      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
+    return new Factory<I, M>(config, fileStoreFactory);
+  }
+
+  /**
+   * Factory for {@link DiskBackedMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      M extends Writable> implements MessageStoreFactory<I, M,
+      FlushableMessageStore<I, M>> {
+    /** Hadoop configuration */
+    private final Configuration config;
+    /** Combiner for messages */
+    private final VertexCombiner<I, M> combiner;
+    /** Factory for creating message stores for partitions */
+    private final
+    MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
+
+    /**
+     * @param config           Hadoop configuration
+     * @param fileStoreFactory Factory for creating message stores for
+     *                         partitions
+     */
+    public Factory(Configuration config,
+        MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
+      this.config = config;
+      if (BspUtils.getVertexCombinerClass(config) == null) {
+        combiner = null;
+      } else {
+        combiner = BspUtils.createVertexCombiner(config);
+      }
+      this.fileStoreFactory = fileStoreFactory;
+    }
+
+    @Override
+    public FlushableMessageStore<I, M> newStore() {
+      return new DiskBackedMessageStore<I, M>(combiner, config,
+          fileStoreFactory);
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Message store which separates data by partitions,
+ * and submits them to underlying message store.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MessageStoreByPartition<I, M> {
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E, M> service;
+  /** Number of messages to keep in memory */
+  private final int maxNumberOfMessagesInMemory;
+  /** Factory for creating file stores when flushing */
+  private final
+  MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
+  /** Map from partition id to its message store */
+  private final
+  ConcurrentMap<Integer, FlushableMessageStore<I, M>> partitionMessageStores;
+
+  /**
+   * @param service                     Service worker
+   * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
+   * @param fileStoreFactory            Factory for creating file stores
+   *                                    when flushing
+   */
+  public DiskBackedMessageStoreByPartition(
+      CentralizedServiceWorker<I, V, E, M> service,
+      int maxNumberOfMessagesInMemory,
+      MessageStoreFactory<I, M, FlushableMessageStore<I,
+          M>> fileStoreFactory) {
+    this.service = service;
+    this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
+    this.fileStoreFactory = fileStoreFactory;
+    partitionMessageStores = Maps.newConcurrentMap();
+  }
+
+  @Override
+  public void addVertexMessages(I vertexId,
+      Collection<M> messages) throws IOException {
+    getMessageStore(vertexId).addVertexMessages(vertexId, messages);
+    checkMemory();
+  }
+
+  @Override
+  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
+    for (Entry<I, Collection<M>> entry : messages.entrySet()) {
+      getMessageStore(entry.getKey()).addVertexMessages(
+          entry.getKey(), entry.getValue());
+    }
+    checkMemory();
+  }
+
+  @Override
+  public void addPartitionMessages(Map<I, Collection<M>> messages,
+      int partitionId) throws IOException {
+    getMessageStore(partitionId).addMessages(messages);
+    checkMemory();
+  }
+
+  @Override
+  public Collection<M> getVertexMessages(I vertexId) throws IOException {
+    if (hasMessagesForVertex(vertexId)) {
+      return getMessageStore(vertexId).getVertexMessages(vertexId);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  @Override
+  public int getNumberOfMessages() {
+    int numOfMessages = 0;
+    for (FlushableMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      numOfMessages += messageStore.getNumberOfMessages();
+    }
+    return numOfMessages;
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
+  }
+
+  @Override
+  public Iterable<I> getDestinationVertices() {
+    List<I> vertices = Lists.newArrayList();
+    for (FlushableMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      Iterables.addAll(vertices, messageStore.getDestinationVertices());
+    }
+    return vertices;
+  }
+
+  @Override
+  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
+    FlushableMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore == null) {
+      return Collections.emptyList();
+    } else {
+      return messageStore.getDestinationVertices();
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    if (hasMessagesForVertex(vertexId)) {
+      getMessageStore(vertexId).clearVertexMessages(vertexId);
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    FlushableMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore != null) {
+      messageStore.clearAll();
+    }
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    for (FlushableMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      messageStore.clearAll();
+    }
+    partitionMessageStores.clear();
+  }
+
+  /**
+   * Checks the memory status, flushes if necessary
+   *
+   * @throws IOException
+   */
+  private void checkMemory() throws IOException {
+    while (memoryFull()) {
+      flushOnePartition();
+    }
+  }
+
+  /**
+   * Check if memory is full
+   *
+   * @return True iff memory is full
+   */
+  private boolean memoryFull() {
+    int totalMessages = 0;
+    for (FlushableMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      totalMessages += messageStore.getNumberOfMessages();
+    }
+    return totalMessages > maxNumberOfMessagesInMemory;
+  }
+
+  /**
+   * Finds biggest partition and flushes it to the disk
+   *
+   * @throws IOException
+   */
+  private void flushOnePartition() throws IOException {
+    int maxMessages = 0;
+    FlushableMessageStore<I, M> biggestStore = null;
+    for (FlushableMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      int numMessages = messageStore.getNumberOfMessages();
+      if (numMessages > maxMessages) {
+        maxMessages = numMessages;
+        biggestStore = messageStore;
+      }
+    }
+    if (biggestStore != null) {
+      biggestStore.flush();
+    }
+  }
+
+  /**
+   * Get message store for partition which holds vertex with required vertex
+   * id
+   *
+   * @param vertexId Id of vertex for which we are asking for message store
+   * @return Requested message store
+   */
+  private FlushableMessageStore<I, M> getMessageStore(I vertexId) {
+    int partitionId =
+        service.getVertexPartitionOwner(vertexId).getPartitionId();
+    return getMessageStore(partitionId);
+  }
+
+  /**
+   * Get message store for partition id. It it doesn't exist yet,
+   * creates a new one.
+   *
+   * @param partitionId Id of partition for which we are asking for message
+   *                    store
+   * @return Requested message store
+   */
+  private FlushableMessageStore<I, M> getMessageStore(int partitionId) {
+    FlushableMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore != null) {
+      return messageStore;
+    }
+    messageStore = fileStoreFactory.newStore();
+    FlushableMessageStore<I, M> store =
+        partitionMessageStores.putIfAbsent(partitionId, messageStore);
+    return (store == null) ? messageStore : store;
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    FlushableMessageStore<I, M> partitionStore =
+        partitionMessageStores.get(partitionId);
+    out.writeBoolean(partitionStore != null);
+    if (partitionStore != null) {
+      partitionStore.write(out);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(partitionMessageStores.size());
+    for (Entry<Integer, FlushableMessageStore<I, M>> entry :
+        partitionMessageStores.entrySet()) {
+      out.writeInt(entry.getKey());
+      entry.getValue().write(out);
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    if (in.readBoolean()) {
+      FlushableMessageStore<I, M> messageStore = fileStoreFactory.newStore();
+      messageStore.readFields(in);
+      partitionMessageStores.put(partitionId, messageStore);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numStores = in.readInt();
+    for (int s = 0; s < numStores; s++) {
+      int partitionId = in.readInt();
+      FlushableMessageStore<I, M> messageStore = fileStoreFactory.newStore();
+      messageStore.readFields(in);
+      partitionMessageStores.put(partitionId, messageStore);
+    }
+  }
+
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param service             Service worker
+   * @param maxMessagesInMemory Number of messages to keep in memory
+   * @param fileStoreFactory    Factory for creating file stores when
+   *                            flushing
+   * @param <I>                 Vertex id
+   * @param <V>                 Vertex data
+   * @param <E>                 Edge data
+   * @param <M>                 Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable>
+  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+      CentralizedServiceWorker<I, V, E, M> service,
+      int maxMessagesInMemory,
+      MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
+          fileStoreFactory) {
+    return new Factory<I, V, E, M>(service, maxMessagesInMemory,
+        fileStoreFactory);
+  }
+
+  /**
+   * Factory for {@link DiskBackedMessageStoreByPartition}
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+    /** Service worker */
+    private final CentralizedServiceWorker<I, V, E, M> service;
+    /** Number of messages to keep in memory */
+    private final int maxMessagesInMemory;
+    /** Factory for creating file stores when flushing */
+    private final
+    MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
+
+    /**
+     * @param service             Service worker
+     * @param maxMessagesInMemory Number of messages to keep in memory
+     * @param fileStoreFactory    Factory for creating file stores when
+     *                            flushing
+     */
+    public Factory(CentralizedServiceWorker<I, V, E, M> service,
+        int maxMessagesInMemory,
+        MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
+            fileStoreFactory) {
+      this.service = service;
+      this.maxMessagesInMemory = maxMessagesInMemory;
+      this.fileStoreFactory = fileStoreFactory;
+    }
+
+    @Override
+    public MessageStoreByPartition<I, M> newStore() {
+      return new DiskBackedMessageStoreByPartition<I, V, E, M>(service,
+          maxMessagesInMemory, fileStoreFactory);
+    }
+  }
+
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Message stores which has flush operation
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public interface FlushableMessageStore<I extends WritableComparable,
+    M extends Writable> extends MessageStore<I, M> {
+  /**
+   * Flushes messages to the disk.
+   *
+   * @throws IOException
+   */
+  void flush() throws IOException;
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Message store
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public interface MessageStore<I extends WritableComparable,
+    M extends Writable> extends BasicMessageStore<I, M> {
+  /**
+   * Adds messages
+   *
+   * @param vertexId Vertex id for which the messages are
+   * @param messages Messages for the vertex
+   * @throws IOException
+   */
+  void addVertexMessages(I vertexId,
+      Collection<M> messages) throws IOException;
+
+  /**
+   * Get number of messages in memory
+   *
+   * @return Number of messages in memory
+   */
+  int getNumberOfMessages();
+
+  /**
+   * Check if we have messages for some vertex
+   *
+   * @param vertexId Id of vertex which we want to check
+   * @return True iff we have messages for vertex with required id
+   */
+  boolean hasMessagesForVertex(I vertexId);
+
+  /**
+   * Gets vertex ids which we have messages for
+   *
+   * @return Iterable over vertex ids which we have messages for
+   */
+  Iterable<I> getDestinationVertices();
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Message store which stores data by partition
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public interface MessageStoreByPartition<I extends WritableComparable,
+    M extends Writable> extends MessageStore<I, M> {
+  /**
+   * Adds messages for partition
+   *
+   * @param messages    Map of messages we want to add
+   * @param partitionId Id of partition
+   * @throws IOException
+   */
+  void addPartitionMessages(Map<I, Collection<M>> messages,
+      int partitionId) throws IOException;
+
+  /**
+   * Gets vertex ids from selected partition which we have messages for
+   *
+   * @param partitionId Id of partition
+   * @return Iterable over vertex ids which we have messages for
+   */
+  Iterable<I> getPartitionDestinationVertices(int partitionId);
+
+  /**
+   * Clears messages for a partition.
+   *
+   * @param partitionId Partition id for which we want to clear messages
+   * @throws IOException
+   */
+  void clearPartition(int partitionId) throws IOException;
+
+  /**
+   * Serialize messages for one partition.
+   *
+   * @param out         {@link DataOutput} to serialize this object into
+   * @param partitionId Id of partition
+   * @throws IOException
+   */
+  void writePartition(DataOutput out, int partitionId) throws IOException;
+
+  /**
+   * Deserialize messages for one partition
+   *
+   * @param in          {@link DataInput} to deserialize this object
+   *                    from.
+   * @param partitionId Id of partition
+   * @throws IOException
+   */
+  void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException;
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory for message stores
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ * @param <S> Message store
+ */
+public interface MessageStoreFactory<I extends WritableComparable,
+    M extends Writable, S extends BasicMessageStore<I, M>> {
+  /**
+   * Creates new message store
+   *
+   * @return New message store
+   */
+  S newStore();
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendPartitionCurrentMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendPartitionCurrentMessagesRequest.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendPartitionCurrentMessagesRequest.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendPartitionCurrentMessagesRequest.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.comm.RequestRegistry.Type;
+import org.apache.giraph.comm.SendPartitionMessagesRequest;
+import org.apache.giraph.comm.ServerData;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Send a collection of vertex messages for a partition. It adds messages to
+ * current message store and it should be used only during partition exchange.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    SendPartitionMessagesRequest<I, V, E, M> {
+  /** Constructor used for reflection only */
+  public SendPartitionCurrentMessagesRequest() { }
+
+  /**
+   * Constructor used to send request.
+   *
+   * @param partitionId Partition to send the request to
+   * @param vertexIdMessages Map of messages to send
+   */
+  public SendPartitionCurrentMessagesRequest(int partitionId,
+      Map<I, Collection<M>> vertexIdMessages) {
+    super(partitionId, vertexIdMessages);
+  }
+
+  @Override
+  public Type getType() {
+    return Type.SEND_PARTITION_CURRENT_MESSAGES_REQUEST;
+  }
+
+  @Override
+  public void doRequest(ServerData<I, V, E, M> serverData) {
+    try {
+      serverData.getCurrentMessageStore().addPartitionMessages(
+          getVertexIdMessages(), getPartitionId());
+    } catch (IOException e) {
+      throw new RuntimeException("doRequest: Got IOException ", e);
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Used for writing and reading collection of messages to the disk. {@link
+ * #addMessages(java.util.Map)} should be called only once with the messages
+ * we want to store.
+ * <p/>
+ * It's optimized for retrieving messages in the natural order of vertex ids
+ * they are sent to.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class SequentialFileMessageStore<I extends WritableComparable,
+    M extends Writable> implements BasicMessageStore<I, M> {
+  /** File in which we store data */
+  private final File file;
+  /** Configuration which we need for reading data */
+  private final Configuration config;
+  /** Buffer size to use when reading and writing files */
+  private final int bufferSize;
+  /** File input stream */
+  private DataInputStream in;
+  /** How many vertices do we have left to read in the file */
+  private int verticesLeft;
+  /** Id of currently read vertex */
+  private I currentVertexId;
+
+  /**
+   * Stores message on the disk.
+   *
+   * @param config     Configuration used later for reading
+   * @param bufferSize Buffer size to use when reading and writing
+   * @param fileName   File in which we want to store messages
+   * @throws IOException
+   */
+  public SequentialFileMessageStore(Configuration config, int bufferSize,
+      String fileName) {
+    this.config = config;
+    this.bufferSize = bufferSize;
+    file = new File(fileName);
+  }
+
+  @Override
+  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
+    SortedMap<I, Collection<M>> map;
+    if (!(messages instanceof SortedMap)) {
+      map = Maps.newTreeMap();
+      map.putAll(messages);
+    } else {
+      map = (SortedMap) messages;
+    }
+    writeToFile(map);
+  }
+
+  /**
+   * Writes messages to its file.
+   *
+   * @param messages Messages to write
+   * @throws IOException
+   */
+  private void writeToFile(SortedMap<I, Collection<M>> messages) throws
+      IOException {
+    if (file.exists()) {
+      file.delete();
+    }
+    file.createNewFile();
+    DataOutputStream out = null;
+
+    try {
+      out = new DataOutputStream(
+          new BufferedOutputStream(new FileOutputStream(file), bufferSize));
+      out.writeInt(messages.size());
+      for (Entry<I, Collection<M>> entry : messages.entrySet()) {
+        entry.getKey().write(out);
+        out.writeInt(entry.getValue().size());
+        for (M message : entry.getValue()) {
+          message.write(out);
+        }
+      }
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  /**
+   * Reads messages for a vertex. It will find the messages only if all
+   * previous reads used smaller vertex ids than this one - messages should
+   * be retrieved in increasing order of vertex ids.
+   *
+   * @param vertexId Vertex id for which we want to get messages
+   * @return Messages for the selected vertex, or empty list if not used
+   *         correctly
+   * @throws IOException
+   */
+  @Override
+  public Collection<M> getVertexMessages(I vertexId) throws
+      IOException {
+    if (in == null) {
+      startReading();
+    }
+
+    I nextVertexId = getCurrentVertexId();
+    while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
+      nextVertexId = getNextVertexId();
+    }
+
+    if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
+      return Collections.emptyList();
+    }
+    return readMessagesForCurrentVertex();
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    endReading();
+    file.delete();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(file.length());
+    FileInputStream input = new FileInputStream(file);
+    try {
+      byte[] buffer = new byte[bufferSize];
+      while (true) {
+        int length = input.read(buffer);
+        if (length < 0) {
+          break;
+        }
+        out.write(buffer, 0, length);
+      }
+    } finally {
+      input.close();
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    FileOutputStream output = new FileOutputStream(file);
+    try {
+      long fileLength = in.readLong();
+      byte[] buffer = new byte[bufferSize];
+      for (long position = 0; position < fileLength; position += bufferSize) {
+        int bytes = (int) Math.min(bufferSize, fileLength - position);
+        in.readFully(buffer, 0, bytes);
+        output.write(buffer);
+      }
+    } finally {
+      output.close();
+    }
+  }
+
+  /**
+   * Prepare for reading
+   *
+   * @throws IOException
+   */
+  private void startReading() throws IOException {
+    currentVertexId = null;
+    in = new DataInputStream(
+        new BufferedInputStream(new FileInputStream(file), bufferSize));
+    verticesLeft = in.readInt();
+  }
+
+  /**
+   * Gets current vertex id.
+   * <p/>
+   * If there is a vertex id whose messages haven't been read yet it
+   * will return that vertex id, otherwise it will read and return the next
+   * one.
+   *
+   * @return Current vertex id
+   * @throws IOException
+   */
+  private I getCurrentVertexId() throws IOException {
+    if (currentVertexId != null) {
+      return currentVertexId;
+    } else {
+      return getNextVertexId();
+    }
+  }
+
+  /**
+   * Gets next vertex id.
+   * <p/>
+   * If there is a vertex whose messages haven't been read yet it
+   * will read and skip over its messages to get to the next vertex.
+   *
+   * @return Next vertex id
+   * @throws IOException
+   */
+  private I getNextVertexId() throws IOException {
+    if (currentVertexId != null) {
+      readMessagesForCurrentVertex();
+    }
+    if (verticesLeft == 0) {
+      return null;
+    }
+    currentVertexId = BspUtils.<I>createVertexId(config);
+    currentVertexId.readFields(in);
+    return currentVertexId;
+  }
+
+  /**
+   * Reads messages for current vertex.
+   *
+   * @return Messages for current vertex
+   * @throws IOException
+   */
+  private Collection<M> readMessagesForCurrentVertex() throws IOException {
+    int messagesSize = in.readInt();
+    ArrayList<M> messages = Lists.newArrayList();
+    for (int i = 0; i < messagesSize; i++) {
+      M message = BspUtils.<M>createMessageValue(config);
+      message.readFields(in);
+      messages.add(message);
+    }
+    currentVertexDone();
+    return messages;
+  }
+
+  /**
+   * Release current vertex.
+   *
+   * @throws IOException
+   */
+  private void currentVertexDone() throws IOException {
+    currentVertexId = null;
+    verticesLeft--;
+    if (verticesLeft == 0) {
+      endReading();
+    }
+  }
+
+  /**
+   * Call when we are done reading, for closing files.
+   *
+   * @throws IOException
+   */
+  private void endReading() throws IOException {
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+  }
+
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param config Hadoop configuration
+   * @param <I>    Vertex id
+   * @param <M>    Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, BasicMessageStore<I, M>> newFactory(
+      Configuration config) {
+    return new Factory<I, M>(config);
+  }
+
+  /**
+   * Factory for {@link SequentialFileMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      M extends Writable>
+      implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
+    /** Hadoop configuration */
+    private final Configuration config;
+    /** Directory in which we'll keep necessary files */
+    private final String directory;
+    /** Buffer size to use when reading and writing */
+    private final int bufferSize;
+    /** Counter for created message stores */
+    private final AtomicInteger storeCounter;
+
+    /** @param config Hadoop configuration */
+    public Factory(Configuration config) {
+      this.config = config;
+      String jobId = config.get("mapred.job.id", "Unknown Job");
+      this.directory = config.get(GiraphJob.MESSAGES_DIRECTORY,
+          GiraphJob.MESSAGES_DIRECTORY_DEFAULT) + jobId + File.separator;
+      this.bufferSize = config.getInt(GiraphJob.MESSAGES_BUFFER_SIZE,
+          GiraphJob.MESSAGES_BUFFER_SIZE_DEFAULT);
+      storeCounter = new AtomicInteger();
+      new File(directory).mkdirs();
+    }
+
+    @Override
+    public BasicMessageStore<I, M> newStore() {
+      String fileName = directory + storeCounter.getAndIncrement();
+      return new SequentialFileMessageStore<I, M>(config, bufferSize,
+          fileName);
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Sun Aug  5 00:17:12 2012
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.utils.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Simple in memory message store implemented with a map
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class SimpleMessageStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements MessageStoreByPartition<I, M> {
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E, M> service;
+  /**
+   * Internal message map, from partition id to map from vertex id to
+   * messages
+   */
+  private final ConcurrentMap<Integer, ConcurrentMap<I, Collection<M>>> map;
+  /** Hadoop configuration */
+  private final Configuration config;
+  /** Combiner for messages */
+  private final VertexCombiner<I, M> combiner;
+
+  /**
+   * @param service  Service worker
+   * @param combiner Combiner for messages
+   * @param config   Hadoop configuration
+   */
+  SimpleMessageStore(CentralizedServiceWorker<I, V, E, M> service,
+      VertexCombiner<I, M> combiner, Configuration config) {
+    this.service = service;
+    map = Maps.newConcurrentMap();
+    this.combiner = combiner;
+    this.config = config;
+  }
+
+  @Override
+  public void addVertexMessages(I vertexId,
+      Collection<M> messages) throws IOException {
+    int partitionId = getPartitonId(vertexId);
+    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+    if (partitionMap == null) {
+      partitionMap = map.putIfAbsent(partitionId,
+          Maps.<I, Collection<M>>newConcurrentMap());
+      if (partitionMap == null) {
+        partitionMap = map.get(partitionId);
+      }
+    }
+
+    Collection<M> currentMessages =
+        CollectionUtils.addConcurrent(vertexId, messages, partitionMap);
+    if (combiner != null) {
+      synchronized (currentMessages) {
+        currentMessages =
+            Lists.newArrayList(combiner.combine(vertexId, currentMessages));
+        partitionMap.put(vertexId, currentMessages);
+      }
+    }
+  }
+
+  @Override
+  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
+    for (Entry<I, Collection<M>> entry : messages.entrySet()) {
+      addVertexMessages(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void addPartitionMessages(Map<I, Collection<M>> messages,
+      int partitionId) throws IOException {
+    addMessages(messages);
+  }
+
+  @Override
+  public Collection<M> getVertexMessages(I vertexId) throws IOException {
+    ConcurrentMap<I, Collection<M>> partitionMap =
+        map.get(getPartitonId(vertexId));
+    return (partitionMap == null) ? Collections.<M>emptyList() :
+        map.get(getPartitonId(vertexId)).get(vertexId);
+  }
+
+  @Override
+  public int getNumberOfMessages() {
+    int numberOfMessages = 0;
+    for (ConcurrentMap<I, Collection<M>> partitionMap : map.values()) {
+      for (Collection<M> messages : partitionMap.values()) {
+        numberOfMessages += messages.size();
+      }
+    }
+    return numberOfMessages;
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    ConcurrentMap<I, Collection<M>> partitionMap =
+        map.get(getPartitonId(vertexId));
+    return (partitionMap == null) ? false : partitionMap.containsKey(vertexId);
+  }
+
+  @Override
+  public Iterable<I> getDestinationVertices() {
+    List<I> vertices = Lists.newArrayList();
+    for (ConcurrentMap<I, Collection<M>> partitionMap : map.values()) {
+      vertices.addAll(partitionMap.keySet());
+    }
+    return vertices;
+  }
+
+  @Override
+  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
+    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+    return (partitionMap == null) ? Collections.<I>emptyList() :
+        partitionMap.keySet();
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    ConcurrentMap<I, Collection<M>> partitionMap =
+        map.get(getPartitonId(vertexId));
+    if (partitionMap != null) {
+      partitionMap.remove(vertexId);
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.remove(partitionId);
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
+  }
+
+  /**
+   * Get id of partition which holds vertex with selected id
+   *
+   * @param vertexId Id of vertex
+   * @return Id of partiton
+   */
+  private int getPartitonId(I vertexId) {
+    return service.getVertexPartitionOwner(vertexId).getPartitionId();
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+    out.writeBoolean(partitionMap != null);
+    if (partitionMap != null) {
+      out.writeInt(partitionMap.size());
+      for (Entry<I, Collection<M>> entry : partitionMap.entrySet()) {
+        entry.getKey().write(out);
+        out.writeInt(entry.getValue().size());
+        for (M message : entry.getValue()) {
+          message.write(out);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(map.size());
+    for (int partitionId : map.keySet()) {
+      out.writeInt(partitionId);
+      writePartition(out, partitionId);
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    if (in.readBoolean()) {
+      ConcurrentMap<I, Collection<M>> partitionMap = Maps.newConcurrentMap();
+      int numVertices = in.readInt();
+      for (int v = 0; v < numVertices; v++) {
+        I vertexId = BspUtils.<I>createVertexId(config);
+        vertexId.readFields(in);
+        int numMessages = in.readInt();
+        List<M> messages = Lists.newArrayList();
+        for (int m = 0; m < numMessages; m++) {
+          M message = BspUtils.<M>createMessageValue(config);
+          message.readFields(in);
+          messages.add(message);
+        }
+        partitionMap.put(vertexId, messages);
+      }
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numPartitions = in.readInt();
+    for (int p = 0; p < numPartitions; p++) {
+      int partitionId = in.readInt();
+      readFieldsForPartition(in, partitionId);
+    }
+  }
+
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param service Worker service
+   * @param config  Hadoop configuration
+   * @param <I>     Vertex id
+   * @param <V>     Vertex data
+   * @param <E>     Edge data
+   * @param <M>     Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable>
+  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+      CentralizedServiceWorker<I, V, E, M> service, Configuration config) {
+    return new Factory<I, V, E, M>(service, config);
+  }
+
+  /**
+   * Factory for {@link SimpleMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+    /** Service worker */
+    private final CentralizedServiceWorker<I, V, E, M> service;
+    /** Hadoop configuration */
+    private final Configuration config;
+    /** Combiner for messages */
+    private final VertexCombiner<I, M> combiner;
+
+    /**
+     * @param service Worker service
+     * @param config  Hadoop configuration
+     */
+    public Factory(CentralizedServiceWorker<I, V, E, M> service,
+        Configuration config) {
+      this.service = service;
+      this.config = config;
+      if (BspUtils.getVertexCombinerClass(config) == null) {
+        combiner = null;
+      } else {
+        combiner = BspUtils.createVertexCombiner(config);
+      }
+    }
+
+    @Override
+    public MessageStoreByPartition<I, M> newStore() {
+      return new SimpleMessageStore(service, combiner, config);
+    }
+  }
+}



Mime
View raw message