giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [1/2] GIRAPH-697: Clean up message stores (majakabiljo)
Date Tue, 25 Jun 2013 23:22:33 GMT
Updated Branches:
  refs/heads/trunk f8a3c777e -> 89445670e


http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
new file mode 100644
index 0000000..1cda1d9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Message store which separates data by partitions,
+ * and submits them to underlying message store.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class DiskBackedMessageStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MessageStore<I, M> {
+  /** Message class */
+  private final Class<M> messageClass;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E> service;
+  /** Number of messages to keep in memory */
+  private final int maxNumberOfMessagesInMemory;
+  /** Factory for creating file stores when flushing */
+  private final MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+  partitionStoreFactory;
+  /** Map from partition id to its message store */
+  private final ConcurrentMap<Integer, PartitionDiskBackedMessageStore<I, M>>
+  partitionMessageStores;
+
+  /**
+   * @param messageClass                Message class held in the store
+   * @param service                     Service worker
+   * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
+   * @param partitionStoreFactory       Factory for creating stores for a
+   *                                    partition
+   */
+  public DiskBackedMessageStore(
+      Class<M> messageClass,
+      CentralizedServiceWorker<I, V, E> service,
+      int maxNumberOfMessagesInMemory,
+      MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I,
+          M>> partitionStoreFactory) {
+    this.messageClass = messageClass;
+    this.service = service;
+    this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
+    this.partitionStoreFactory = partitionStoreFactory;
+    partitionMessageStores = Maps.newConcurrentMap();
+  }
+
+  @Override
+  public void addPartitionMessages(
+      int partitionId,
+      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+    PartitionDiskBackedMessageStore<I, M> partitionMessageStore =
+        getMessageStore(partitionId);
+    ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+        vertexIdMessageIterator =
+        messages.getVertexIdMessageIterator();
+    while (vertexIdMessageIterator.hasNext()) {
+      vertexIdMessageIterator.next();
+      boolean ownsVertexId =
+          partitionMessageStore.addVertexMessages(
+              vertexIdMessageIterator.getCurrentVertexId(),
+              Collections.singleton(
+                  vertexIdMessageIterator.getCurrentMessage()));
+      if (ownsVertexId) {
+        vertexIdMessageIterator.releaseCurrentVertexId();
+      }
+    }
+    checkMemory();
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    if (hasMessagesForVertex(vertexId)) {
+      return getMessageStore(vertexId).getVertexMessages(vertexId);
+    } else {
+      return EmptyIterable.get();
+    }
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
+  }
+
+  @Override
+  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
+    PartitionDiskBackedMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore == null) {
+      return Collections.emptyList();
+    } else {
+      return messageStore.getDestinationVertices();
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    if (hasMessagesForVertex(vertexId)) {
+      getMessageStore(vertexId).clearVertexMessages(vertexId);
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    PartitionDiskBackedMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore != null) {
+      messageStore.clearAll();
+    }
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    for (PartitionDiskBackedMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      messageStore.clearAll();
+    }
+    partitionMessageStores.clear();
+  }
+
+  /**
+   * Checks the memory status, flushes if necessary
+   *
+   * @throws IOException
+   */
+  private void checkMemory() throws IOException {
+    while (memoryFull()) {
+      flushOnePartition();
+    }
+  }
+
+  /**
+   * Check if memory is full
+   *
+   * @return True iff memory is full
+   */
+  private boolean memoryFull() {
+    int totalMessages = 0;
+    for (PartitionDiskBackedMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      totalMessages += messageStore.getNumberOfMessages();
+    }
+    return totalMessages > maxNumberOfMessagesInMemory;
+  }
+
+  /**
+   * Finds biggest partition and flushes it to the disk
+   *
+   * @throws IOException
+   */
+  private void flushOnePartition() throws IOException {
+    int maxMessages = 0;
+    PartitionDiskBackedMessageStore<I, M> biggestStore = null;
+    for (PartitionDiskBackedMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      int numMessages = messageStore.getNumberOfMessages();
+      if (numMessages > maxMessages) {
+        maxMessages = numMessages;
+        biggestStore = messageStore;
+      }
+    }
+    if (biggestStore != null) {
+      biggestStore.flush();
+    }
+  }
+
+  /**
+   * Get message store for partition which holds vertex with required vertex
+   * id
+   *
+   * @param vertexId Id of vertex for which we are asking for message store
+   * @return Requested message store
+   */
+  private PartitionDiskBackedMessageStore<I, M> getMessageStore(I vertexId) {
+    int partitionId =
+        service.getVertexPartitionOwner(vertexId).getPartitionId();
+    return getMessageStore(partitionId);
+  }
+
+  /**
+   * Get message store for partition id. It it doesn't exist yet,
+   * creates a new one.
+   *
+   * @param partitionId Id of partition for which we are asking for message
+   *                    store
+   * @return Requested message store
+   */
+  private PartitionDiskBackedMessageStore<I, M> getMessageStore(
+      int partitionId) {
+    PartitionDiskBackedMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore != null) {
+      return messageStore;
+    }
+    messageStore = partitionStoreFactory.newStore(messageClass);
+    PartitionDiskBackedMessageStore<I, M> store =
+        partitionMessageStores.putIfAbsent(partitionId, messageStore);
+    return (store == null) ? messageStore : store;
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    PartitionDiskBackedMessageStore<I, M> partitionStore =
+        partitionMessageStores.get(partitionId);
+    out.writeBoolean(partitionStore != null);
+    if (partitionStore != null) {
+      partitionStore.write(out);
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    if (in.readBoolean()) {
+      PartitionDiskBackedMessageStore<I, M> messageStore =
+          partitionStoreFactory.newStore(messageClass);
+      messageStore.readFields(in);
+      partitionMessageStores.put(partitionId, messageStore);
+    }
+  }
+
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param service             Service worker
+   * @param maxMessagesInMemory Number of messages to keep in memory
+   * @param fileStoreFactory    Factory for creating file stores when
+   *                            flushing
+   * @param <I>                 Vertex id
+   * @param <V>                 Vertex data
+   * @param <E>                 Edge data
+   * @param <M>                 Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable>
+  MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
+      CentralizedServiceWorker<I, V, E> service,
+      int maxMessagesInMemory,
+      MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+          fileStoreFactory) {
+    return new Factory<I, V, E, M>(service, maxMessagesInMemory,
+        fileStoreFactory);
+  }
+
+  /**
+   * Factory for {@link DiskBackedMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
+    /** Service worker */
+    private final CentralizedServiceWorker<I, V, E> service;
+    /** Number of messages to keep in memory */
+    private final int maxMessagesInMemory;
+    /** Factory for creating file stores when flushing */
+    private final
+    MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+    fileStoreFactory;
+
+    /**
+     * @param service             Service worker
+     * @param maxMessagesInMemory Number of messages to keep in memory
+     * @param fileStoreFactory    Factory for creating file stores when
+     *                            flushing
+     */
+    public Factory(CentralizedServiceWorker<I, V, E> service,
+        int maxMessagesInMemory,
+        MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+            fileStoreFactory) {
+      this.service = service;
+      this.maxMessagesInMemory = maxMessagesInMemory;
+      this.fileStoreFactory = fileStoreFactory;
+    }
+
+    @Override
+    public MessageStore<I, M> newStore(Class<M> messageClass) {
+      return new DiskBackedMessageStore<I, V, E, M>(messageClass,
+          service, maxMessagesInMemory, fileStoreFactory);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
new file mode 100644
index 0000000..4ae805a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Message storage with in-memory map of messages and with support for
+ * flushing all the messages to the disk. Holds messages for a single partition.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class PartitionDiskBackedMessageStore<I extends WritableComparable,
+    M extends Writable> implements Writable {
+  /** Message class */
+  private final Class<M> messageClass;
+  /**
+   * In-memory message map (must be sorted to insure that the ids are
+   * ordered)
+   */
+  private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
+  inMemoryMessages;
+  /** Hadoop configuration */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+  /** Counter for number of messages in-memory */
+  private final AtomicInteger numberOfMessagesInMemory;
+  /** To keep vertex ids which we have messages for */
+  private final Set<I> destinationVertices;
+  /** File stores in which we keep flushed messages */
+  private final Collection<SequentialFileMessageStore<I, M>> fileStores;
+  /** Factory for creating file stores when flushing */
+  private final
+  MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> fileStoreFactory;
+  /** Lock for disk flushing */
+  private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
+  /**
+   * Constructor.
+   *
+   * @param messageClass     Message class held in the store
+   * @param config           Hadoop configuration
+   * @param fileStoreFactory Factory for creating file stores when flushing
+   */
+  public PartitionDiskBackedMessageStore(
+      Class<M> messageClass,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+      MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+          fileStoreFactory) {
+    inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+    this.messageClass = messageClass;
+    this.config = config;
+    numberOfMessagesInMemory = new AtomicInteger(0);
+    destinationVertices =
+        Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
+    fileStores = Lists.newArrayList();
+    this.fileStoreFactory = fileStoreFactory;
+  }
+
+  /**
+   * Add vertex messages
+   *
+   * @param vertexId Vertex id to use
+   * @param messages Messages to add (note that the lifetime of the messages)
+   *                 is only until next() is called again)
+   * @return True if the vertex id ownership is taken by this method,
+   *         false otherwise
+   * @throws IOException
+   */
+  boolean addVertexMessages(I vertexId,
+                            Iterable<M> messages) throws IOException {
+    boolean ownsVertexId = false;
+    destinationVertices.add(vertexId);
+    rwLock.readLock().lock();
+    try {
+      ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+      if (extendedDataOutput == null) {
+        ExtendedDataOutput newExtendedDataOutput =
+            config.createExtendedDataOutput();
+        extendedDataOutput =
+            inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
+        if (extendedDataOutput == null) {
+          ownsVertexId = true;
+          extendedDataOutput = newExtendedDataOutput;
+        }
+      }
+
+      synchronized (extendedDataOutput) {
+        for (M message : messages) {
+          message.write(extendedDataOutput);
+          numberOfMessagesInMemory.getAndIncrement();
+        }
+      }
+    } finally {
+      rwLock.readLock().unlock();
+    }
+
+    return ownsVertexId;
+  }
+
+  /**
+   * Get the messages for a vertex.
+   *
+   * @param vertexId Vertex id for which we want to get messages
+   * @return Iterable of messages for a vertex id
+   */
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+    if (extendedDataOutput == null) {
+      extendedDataOutput = config.createExtendedDataOutput();
+    }
+    Iterable<M> combinedIterable = new MessagesIterable<M>(
+        config, messageClass,
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+
+    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+      combinedIterable = Iterables.concat(combinedIterable,
+          fileStore.getVertexMessages(vertexId));
+    }
+    return combinedIterable;
+  }
+
+  /**
+   * Get number of messages in memory
+   *
+   * @return Number of messages in memory
+   */
+  public int getNumberOfMessages() {
+    return numberOfMessagesInMemory.get();
+  }
+
+  /**
+   * Check if we have messages for some vertex
+   *
+   * @param vertexId Id of vertex which we want to check
+   * @return True iff we have messages for vertex with required id
+   */
+  public boolean hasMessagesForVertex(I vertexId) {
+    return destinationVertices.contains(vertexId);
+  }
+
+  /**
+   * Gets vertex ids which we have messages for
+   *
+   * @return Iterable over vertex ids which we have messages for
+   */
+  public Iterable<I> getDestinationVertices() {
+    return destinationVertices;
+  }
+
+  /**
+   * Clears messages for a vertex.
+   *
+   * @param vertexId Vertex id for which we want to clear messages
+   * @throws IOException
+   */
+  public void clearVertexMessages(I vertexId) throws IOException {
+    inMemoryMessages.remove(vertexId);
+  }
+
+  /**
+   * Clears all resources used by this store.
+   *
+   * @throws IOException
+   */
+  public void clearAll() throws IOException {
+    inMemoryMessages.clear();
+    destinationVertices.clear();
+    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+      fileStore.clearAll();
+    }
+    fileStores.clear();
+  }
+
+  /**
+   * Flushes messages to the disk.
+   *
+   * @throws IOException
+   */
+  public void flush() throws IOException {
+    ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
+    rwLock.writeLock().lock();
+    try {
+      messagesToFlush = inMemoryMessages;
+      inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+      numberOfMessagesInMemory.set(0);
+    } finally {
+      rwLock.writeLock().unlock();
+    }
+    SequentialFileMessageStore<I, M> fileStore =
+        fileStoreFactory.newStore(messageClass);
+    fileStore.addMessages(messagesToFlush);
+
+    synchronized (fileStores) {
+      fileStores.add(fileStore);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write destination vertices
+    out.writeInt(destinationVertices.size());
+    for (I vertexId : destinationVertices) {
+      vertexId.write(out);
+    }
+
+    // write of in-memory messages
+    out.writeInt(numberOfMessagesInMemory.get());
+
+    // write in-memory messages map
+    out.writeInt(inMemoryMessages.size());
+    for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
+      entry.getKey().write(out);
+      out.writeInt(entry.getValue().getPos());
+      out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
+    }
+
+    // write file stores
+    out.writeInt(fileStores.size());
+    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+      fileStore.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // read destination vertices
+    int numVertices = in.readInt();
+    for (int v = 0; v < numVertices; v++) {
+      I vertexId = (I) config.createVertexId();
+      vertexId.readFields(in);
+      destinationVertices.add(vertexId);
+    }
+
+    // read in-memory messages
+    numberOfMessagesInMemory.set(in.readInt());
+
+    // read in-memory map
+    int mapSize = in.readInt();
+    for (int m = 0; m < mapSize; m++) {
+      I vertexId = config.createVertexId();
+      vertexId.readFields(in);
+      int messageBytes = in.readInt();
+      byte[] buf = new byte[messageBytes];
+      ExtendedDataOutput extendedDataOutput =
+          config.createExtendedDataOutput(buf, messageBytes);
+      inMemoryMessages.put(vertexId, extendedDataOutput);
+    }
+
+    // read file stores
+    int numFileStores = in.readInt();
+    for (int s = 0; s < numFileStores; s++) {
+      SequentialFileMessageStore<I, M> fileStore =
+          fileStoreFactory.newStore(messageClass);
+      fileStore.readFields(in);
+      fileStores.add(fileStore);
+    }
+  }
+
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param config           Hadoop configuration
+   * @param fileStoreFactory Factory for creating message stores for
+   *                         partitions
+   * @param <I>              Vertex id
+   * @param <M>              Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> newFactory(
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+      MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+          fileStoreFactory) {
+    return new Factory<I, M>(config, fileStoreFactory);
+  }
+
+  /**
+   * Factory for {@link PartitionDiskBackedMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      M extends Writable> implements MessageStoreFactory<I, M,
+      PartitionDiskBackedMessageStore<I, M>> {
+    /** Hadoop configuration */
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+    /** Factory for creating message stores for partitions */
+    private final MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+    fileStoreFactory;
+
+    /**
+     * @param config           Hadoop configuration
+     * @param fileStoreFactory Factory for creating message stores for
+     *                         partitions
+     */
+    public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+        MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+            fileStoreFactory) {
+      this.config = config;
+      this.fileStoreFactory = fileStoreFactory;
+    }
+
+    @Override
+    public PartitionDiskBackedMessageStore<I, M> newStore(
+        Class<M> messageClass) {
+      return new PartitionDiskBackedMessageStore<I, M>(messageClass, config,
+          fileStoreFactory);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
new file mode 100644
index 0000000..e5931c6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
+
+/**
+ * Used for writing and reading collection of messages to the disk.
+ * {@link SequentialFileMessageStore#addMessages(NavigableMap)}
+ * should be called only once with the messages we want to store.
+ * <p/>
+ * It's optimized for retrieving messages in the natural order of vertex ids
+ * they are sent to.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class SequentialFileMessageStore<I extends WritableComparable,
+    M extends Writable> implements Writable {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SequentialFileMessageStore.class);
+  /** Message class */
+  private final Class<M> messageClass;
+  /** File in which we store data */
+  private final File file;
+  /** Configuration which we need for reading data */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+  /** Buffer size to use when reading and writing files */
+  private final int bufferSize;
+  /** File input stream */
+  private DataInputStream in;
+  /** How many vertices do we have left to read in the file */
+  private int verticesLeft;
+  /** Id of currently read vertex */
+  private I currentVertexId;
+
+  /**
+   * Stores message on the disk.
+   *
+   * @param messageClass Message class held in the store
+   * @param config       Configuration used later for reading
+   * @param bufferSize   Buffer size to use when reading and writing
+   * @param fileName     File in which we want to store messages
+   * @throws IOException
+   */
+  public SequentialFileMessageStore(
+      Class<M> messageClass,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+      int bufferSize,
+      String fileName) {
+    this.messageClass = messageClass;
+    this.config = config;
+    this.bufferSize = bufferSize;
+    file = new File(fileName);
+  }
+
+  /**
+   * Adds messages from one message store to another
+   *
+   * @param messageMap Add the messages from this map to this store
+   * @throws java.io.IOException
+   */
+  public void addMessages(NavigableMap<I, ExtendedDataOutput> messageMap)
+    throws IOException {
+    // Writes messages to its file
+    if (file.exists()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("addMessages: Deleting " + file);
+      }
+      file.delete();
+    }
+    file.createNewFile();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addMessages: Creating " + file);
+    }
+
+    DataOutputStream out = null;
+
+    try {
+      out = new DataOutputStream(
+          new BufferedOutputStream(new FileOutputStream(file), bufferSize));
+      int destinationVertexIdCount = messageMap.size();
+      out.writeInt(destinationVertexIdCount);
+
+      // Dump the vertices and their messages in a sorted order
+      for (Map.Entry<I, ExtendedDataOutput> entry : messageMap.entrySet()) {
+        I destinationVertexId = entry.getKey();
+        destinationVertexId.write(out);
+        ExtendedDataOutput extendedDataOutput = entry.getValue();
+        Iterable<M> messages = new MessagesIterable<M>(
+            config, messageClass, extendedDataOutput.getByteArray(), 0,
+            extendedDataOutput.getPos());
+        int messageCount = Iterables.size(messages);
+        out.writeInt(messageCount);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("addMessages: For vertex id " + destinationVertexId +
+              ", messages = " + messageCount + " to file " + file);
+        }
+        for (M message : messages) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("addMessages: Wrote " + message + " to " + file);
+          }
+          message.write(out);
+        }
+      }
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  /**
+   * Reads messages for a vertex. It will find the messages only if all
+   * previous reads used smaller vertex ids than this one - messages should
+   * be retrieved in increasing order of vertex ids.
+   *
+   * @param vertexId Vertex id for which we want to get messages
+   * @return Messages for the selected vertex, or empty list if not used
+   *         correctly
+   * @throws IOException
+   */
+  public Iterable<M> getVertexMessages(I vertexId) throws
+      IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
+          " (currently " + currentVertexId + ") from " + file);
+    }
+    if (in == null) {
+      startReading();
+    }
+
+    I nextVertexId = getCurrentVertexId();
+    while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
+      nextVertexId = getNextVertexId();
+    }
+
+    if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
+      return EmptyIterable.get();
+    }
+
+    return readMessagesForCurrentVertex();
+  }
+
+  /**
+   * Clears all resources used by this store.
+   */
+  public void clearAll() throws IOException {
+    endReading();
+    file.delete();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(file.length());
+    FileInputStream input = new FileInputStream(file);
+    try {
+      byte[] buffer = new byte[bufferSize];
+      while (true) {
+        int length = input.read(buffer);
+        if (length < 0) {
+          break;
+        }
+        out.write(buffer, 0, length);
+      }
+    } finally {
+      input.close();
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    FileOutputStream output = new FileOutputStream(file);
+    try {
+      long fileLength = in.readLong();
+      byte[] buffer = new byte[bufferSize];
+      for (long position = 0; position < fileLength; position += bufferSize) {
+        int bytes = (int) Math.min(bufferSize, fileLength - position);
+        in.readFully(buffer, 0, bytes);
+        output.write(buffer);
+      }
+    } finally {
+      output.close();
+    }
+  }
+
+  /**
+   * Prepare for reading
+   *
+   * @throws IOException
+   */
+  private void startReading() throws IOException {
+    currentVertexId = null;
+    in = new DataInputStream(
+        new BufferedInputStream(new FileInputStream(file), bufferSize));
+    verticesLeft = in.readInt();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("startReading: File " + file + " with " +
+          verticesLeft + " vertices left");
+    }
+  }
+
+  /**
+   * Gets current vertex id.
+   * <p/>
+   * If there is a vertex id whose messages haven't been read yet it
+   * will return that vertex id, otherwise it will read and return the next
+   * one.
+   *
+   * @return Current vertex id
+   * @throws IOException
+   */
+  private I getCurrentVertexId() throws IOException {
+    if (currentVertexId != null) {
+      return currentVertexId;
+    } else {
+      return getNextVertexId();
+    }
+  }
+
+  /**
+   * Gets next vertex id.
+   * <p/>
+   * If there is a vertex whose messages haven't been read yet it
+   * will read and skip over its messages to get to the next vertex.
+   *
+   * @return Next vertex id
+   * @throws IOException
+   */
+  private I getNextVertexId() throws IOException {
+    if (currentVertexId != null) {
+      readMessagesForCurrentVertex();
+    }
+    if (verticesLeft == 0) {
+      return null;
+    }
+    currentVertexId = config.createVertexId();
+    currentVertexId.readFields(in);
+    return currentVertexId;
+  }
+
+  /**
+   * Reads messages for current vertex.
+   *
+   * @return Messages for current vertex
+   * @throws IOException
+   */
+  private Collection<M> readMessagesForCurrentVertex() throws IOException {
+    int messagesSize = in.readInt();
+    List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
+    for (int i = 0; i < messagesSize; i++) {
+      M message = ReflectionUtils.newInstance(messageClass);
+      try {
+        message.readFields(in);
+      } catch (IOException e) {
+        throw new IllegalStateException("readMessagesForCurrentVertex: " +
+            "Failed to read message from " + i + " of " +
+            messagesSize + " for vertex id " + currentVertexId + " from " +
+            file, e);
+      }
+      messages.add(message);
+    }
+    currentVertexDone();
+    return messages;
+  }
+
+  /**
+   * Release current vertex.
+   *
+   * @throws IOException
+   */
+  private void currentVertexDone() throws IOException {
+    currentVertexId = null;
+    verticesLeft--;
+    if (verticesLeft == 0) {
+      endReading();
+    }
+  }
+
+  /**
+   * Call when we are done reading, for closing files.
+   *
+   * @throws IOException
+   */
+  private void endReading() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("endReading: Stopped reading " + file);
+    }
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+  }
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param config Hadoop configuration
+   * @param <I>    Vertex id
+   * @param <M>    Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> newFactory(
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    return new Factory<I, M>(config);
+  }
+
+  /**
+   * Factory for {@link SequentialFileMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      M extends Writable>
+      implements MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> {
+    /** Hadoop configuration */
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+    /** Directories in which we'll keep necessary files */
+    private final String[] directories;
+    /** Buffer size to use when reading and writing */
+    private final int bufferSize;
+    /** Counter for created message stores */
+    private final AtomicInteger storeCounter;
+
+    /**
+     * Constructor.
+     *
+     * @param config Hadoop configuration
+     */
+    public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+      this.config = config;
+      String jobId = config.get("mapred.job.id", "Unknown Job");
+      int taskId   = config.getTaskPartition();
+      List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
+      Collections.shuffle(userPaths);
+      directories = new String[userPaths.size()];
+      int i = 0;
+      for (String path : userPaths) {
+        String directory = path + File.separator + jobId + File.separator +
+            taskId + File.separator;
+        directories[i++] = directory;
+        new File(directory).mkdirs();
+      }
+      this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
+      storeCounter = new AtomicInteger();
+    }
+
+    @Override
+    public SequentialFileMessageStore<I, M> newStore(Class<M> messageClass) {
+      int idx = Math.abs(storeCounter.getAndIncrement());
+      String fileName =
+          directories[idx % directories.length] + "messages-" + idx;
+      return new SequentialFileMessageStore<I, M>(messageClass, config,
+          bufferSize, fileName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
new file mode 100644
index 0000000..7039378
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of out-of-core messages related classes.
+ */
+package org.apache.giraph.comm.messages.out_of_core;

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
index 3c798a9..8721756 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * Package of communication related objects, IPC service.
+ * Package of classes for storing messages.
  */
 package org.apache.giraph.comm.messages;

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 2adf19d..d786db5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -26,7 +26,7 @@ import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
@@ -215,7 +215,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   private void sendPartitionMessages(WorkerInfo workerInfo,
                                      Partition<I, V, E> partition) {
     final int partitionId = partition.getId();
-    MessageStoreByPartition<I, Writable> messageStore =
+    MessageStore<I, Writable> messageStore =
         serverData.getCurrentMessageStore();
     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
         new ByteArrayVertexIdMessages<I, Writable>(

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index b457038..3473de1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -21,14 +21,12 @@ package org.apache.giraph.comm.netty;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.messages.BasicMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
-import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
@@ -107,7 +105,7 @@ public class NettyWorkerServer<I extends WritableComparable,
    *
    * @return Message store factory
    */
-  private MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+  private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
   createMessageStoreFactory() {
     boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
     if (!useOutOfCoreMessaging) {
@@ -118,12 +116,13 @@ public class NettyWorkerServer<I extends WritableComparable,
         LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
             "maxMessagesInMemory = " + maxMessagesInMemory);
       }
-      MessageStoreFactory<I, Writable, BasicMessageStore<I, Writable>>
+      MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
           fileStoreFactory = SequentialFileMessageStore.newFactory(conf);
-      MessageStoreFactory<I, Writable, FlushableMessageStore<I, Writable>>
+      MessageStoreFactory<I, Writable,
+          PartitionDiskBackedMessageStore<I, Writable>>
           partitionStoreFactory =
-          DiskBackedMessageStore.newFactory(conf, fileStoreFactory);
-      return DiskBackedMessageStoreByPartition.newFactory(service,
+          PartitionDiskBackedMessageStore.newFactory(conf, fileStoreFactory);
+      return DiskBackedMessageStore.newFactory(service,
           maxMessagesInMemory, partitionStoreFactory);
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 6fdcfb0..a9bf3fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -19,7 +19,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.SimpleVertexWriter;
@@ -81,7 +81,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   /** Thread-safe queue of all partition ids */
   private final BlockingQueue<Integer> partitionIdQueue;
   /** Message store */
-  private final MessageStoreByPartition<I, M1> messageStore;
+  private final MessageStore<I, M1> messageStore;
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** Worker (for NettyWorkerClientRequestProcessor) */
@@ -111,7 +111,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
    */
   public ComputeCallable(
       Mapper<?, ?, ?, ?>.Context context, GraphState graphState,
-      MessageStoreByPartition<I, M1> messageStore,
+      MessageStore<I, M1> messageStore,
       BlockingQueue<Integer> partitionIdQueue,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       CentralizedServiceWorker<I, V, E> serviceWorker) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 435dd87..b32c21b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -21,7 +21,7 @@ package org.apache.giraph.graph;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.BspServiceMaster;
@@ -253,7 +253,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       graphState = checkSuperstepRestarted(superstep, graphState);
       prepareForSuperstep(graphState);
       context.progress();
-      MessageStoreByPartition<I, Writable> messageStore =
+      MessageStore<I, Writable> messageStore =
         serviceWorker.getServerData().getCurrentMessageStore();
       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
       int numThreads = Math.min(numComputeThreads, numPartitions);
@@ -691,7 +691,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
       List<PartitionStats> partitionStatsList,
       final GraphState graphState,
-      final MessageStoreByPartition<I, Writable> messageStore,
+      final MessageStore<I, Writable> messageStore,
       int numPartitions,
       int numThreads) {
     final BlockingQueue<Integer> computePartitionIdQueue =

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 5c69161..35e6362 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -102,7 +102,7 @@ public class RequestFailureTest {
   private void checkResult(int numRequests) throws IOException {
     // Check the output
     Iterable<IntWritable> vertices =
-        serverData.getIncomingMessageStore().getDestinationVertices();
+        serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 7016572..c8c09df 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -168,7 +168,7 @@ public class RequestTest {
 
     // Check the output
     Iterable<IntWritable> vertices =
-        serverData.getIncomingMessageStore().getDestinationVertices();
+        serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
index 4e8041a..e270816 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -24,14 +24,12 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.BasicMessageStore;
 import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
-import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -138,42 +136,25 @@ public class TestMessageStores {
     return allMessages;
   }
 
-  /**
-   * Used for testing only
-   */
-  private static class InputMessageStore extends
-      ByteArrayMessagesPerVertexStore<IntWritable, IntWritable> {
-
-    /**
-     * Constructor
-     *
-     * @param service Service worker
-     * @param config  Hadoop configuration
-     */
-    InputMessageStore(
-        CentralizedServiceWorker<IntWritable, ?, ?> service,
-        ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
-        Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException {
-      super(IntWritable.class, service, config);
-      // Adds all the messages to the store
-      for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
-          inputMap.entrySet()) {
-        int partitionId = getPartitionId(entry.getKey());
-        ByteArrayVertexIdMessages<IntWritable, IntWritable>
-            byteArrayVertexIdMessages =
-            new ByteArrayVertexIdMessages<IntWritable,
-                IntWritable>(IntWritable.class);
-        byteArrayVertexIdMessages.setConf(config);
-        byteArrayVertexIdMessages.initialize();
-        for (IntWritable message : entry.getValue()) {
-          byteArrayVertexIdMessages.add(entry.getKey(), message);
-        }
-        try {
-          addPartitionMessages(partitionId, byteArrayVertexIdMessages);
-        } catch (IOException e) {
-          throw new IllegalStateException("Got IOException", e);
-        }
+  private static void addMessages(
+      MessageStore<IntWritable, IntWritable> messageStore,
+      CentralizedServiceWorker<IntWritable, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
+      Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException {
+    for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
+        inputMap.entrySet()) {
+      int partitionId =
+          service.getVertexPartitionOwner(entry.getKey()).getPartitionId();
+      ByteArrayVertexIdMessages<IntWritable, IntWritable>
+          byteArrayVertexIdMessages =
+          new ByteArrayVertexIdMessages<IntWritable,
+              IntWritable>(IntWritable.class);
+      byteArrayVertexIdMessages.setConf(config);
+      byteArrayVertexIdMessages.initialize();
+      for (IntWritable message : entry.getValue()) {
+        byteArrayVertexIdMessages.add(entry.getKey(), message);
       }
+      messageStore.addPartitionMessages(partitionId, byteArrayVertexIdMessages);
     }
   }
 
@@ -184,8 +165,7 @@ public class TestMessageStores {
     for (int n = 0; n < testData.numTimes; n++) {
       SortedMap<IntWritable, Collection<IntWritable>> batch =
           createRandomMessages(testData);
-      messageStore.addMessages(new InputMessageStore(service, config,
-          batch));
+      addMessages(messageStore, service, config, batch);
       for (Entry<IntWritable, Collection<IntWritable>> entry :
           batch.entrySet()) {
         if (messages.containsKey(entry.getKey())) {
@@ -200,19 +180,24 @@ public class TestMessageStores {
   private <I extends WritableComparable, M extends Writable> boolean
   equalMessages(
       MessageStore<I, M> messageStore,
-      Map<I, Collection<M>> expectedMessages) throws IOException {
-    TreeSet<I> vertexIds = Sets.newTreeSet();
-    Iterables.addAll(vertexIds, messageStore.getDestinationVertices());
-    for (I vertexId : vertexIds) {
-      Iterable<M> expected = expectedMessages.get(vertexId);
-      if (expected == null) {
-        return false;
-      }
-      Iterable<M> actual = messageStore.getVertexMessages(vertexId);
-      if (!CollectionUtils.isEqual(expected, actual)) {
-        System.err.println("equalMessages: For vertexId " + vertexId +
-            " expected " + expected + ", but got " + actual);
-        return false;
+      Map<I, Collection<M>> expectedMessages,
+      TestData testData) throws IOException {
+    for (int partitionId = 0; partitionId < testData.numOfPartitions;
+         partitionId++) {
+      TreeSet<I> vertexIds = Sets.newTreeSet();
+      Iterables.addAll(vertexIds,
+          messageStore.getPartitionDestinationVertices(partitionId));
+      for (I vertexId : vertexIds) {
+        Iterable<M> expected = expectedMessages.get(vertexId);
+        if (expected == null) {
+          return false;
+        }
+        Iterable<M> actual = messageStore.getVertexMessages(vertexId);
+        if (!CollectionUtils.isEqual(expected, actual)) {
+          System.err.println("equalMessages: For vertexId " + vertexId +
+              " expected " + expected + ", but got " + actual);
+          return false;
+        }
       }
     }
     return true;
@@ -220,7 +205,7 @@ public class TestMessageStores {
 
   private <S extends MessageStore<IntWritable, IntWritable>> S doCheckpoint(
       MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
-      S messageStore) throws IOException {
+      S messageStore, TestData testData) throws IOException {
     File file = new File(directory, "messageStoreTest");
     if (file.exists()) {
       file.delete();
@@ -228,14 +213,20 @@ public class TestMessageStores {
     file.createNewFile();
     DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         (new FileOutputStream(file))));
-    messageStore.write(out);
+    for (int partitionId = 0; partitionId < testData.numOfPartitions;
+         partitionId++) {
+      messageStore.writePartition(out, partitionId);
+    }
     out.close();
 
     messageStore = messageStoreFactory.newStore(IntWritable.class);
 
     DataInputStream in = new DataInputStream(new BufferedInputStream(
         (new FileInputStream(file))));
-    messageStore.readFields(in);
+    for (int partitionId = 0; partitionId < testData.numOfPartitions;
+         partitionId++) {
+      messageStore.readFieldsForPartition(in, partitionId);
+    }
     in.close();
     file.delete();
 
@@ -250,10 +241,10 @@ public class TestMessageStores {
         new TreeMap<IntWritable, Collection<IntWritable>>();
     S messageStore = messageStoreFactory.newStore(IntWritable.class);
     putNTimes(messageStore, messages, testData);
-    assertTrue(equalMessages(messageStore, messages));
+    assertTrue(equalMessages(messageStore, messages, testData));
     messageStore.clearAll();
-    messageStore = doCheckpoint(messageStoreFactory, messageStore);
-    assertTrue(equalMessages(messageStore, messages));
+    messageStore = doCheckpoint(messageStoreFactory, messageStore, testData);
+    assertTrue(equalMessages(messageStore, messages, testData));
     messageStore.clearAll();
   }
 
@@ -270,31 +261,18 @@ public class TestMessageStores {
   }
 
   @Test
-  public void testDiskBackedMessageStore() {
-    try {
-      MessageStoreFactory<IntWritable, IntWritable,
-          BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
-          SequentialFileMessageStore.newFactory(config);
-      MessageStoreFactory<IntWritable, IntWritable,
-          FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
-          DiskBackedMessageStore.newFactory(config, fileStoreFactory);
-      testMessageStore(diskStoreFactory, testData);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Test
   public void testDiskBackedMessageStoreByPartition() {
     try {
       MessageStoreFactory<IntWritable, IntWritable,
-          BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
+          SequentialFileMessageStore<IntWritable, IntWritable>>
+          fileStoreFactory =
           SequentialFileMessageStore.newFactory(config);
       MessageStoreFactory<IntWritable, IntWritable,
-          FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
-          DiskBackedMessageStore.newFactory(config, fileStoreFactory);
-      testMessageStore(DiskBackedMessageStoreByPartition.newFactory(service,
-          testData.maxMessagesInMemory, diskStoreFactory), testData);
+          PartitionDiskBackedMessageStore<IntWritable, IntWritable>>
+          partitionStoreFactory =
+          PartitionDiskBackedMessageStore.newFactory(config, fileStoreFactory);
+      testMessageStore(DiskBackedMessageStore.newFactory(service,
+          testData.maxMessagesInMemory, partitionStoreFactory), testData);
     } catch (IOException e) {
       e.printStackTrace();
     }


Mime
View raw message