giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [13/47] git commit: updated refs/heads/release-1.1 to 4c139ee
Date Sun, 26 Oct 2014 01:21:56 GMT
GIRAPH-912: Support succinct representation of messages in messagestores (pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f31e9a32
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f31e9a32
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f31e9a32

Branch: refs/heads/release-1.1
Commit: f31e9a328d3b4f10906a10a8e69d2ae515e3aba0
Parents: 61cb37e
Author: Pavan Kumar <pavanka@fb.com>
Authored: Wed Jul 9 17:08:48 2014 -0700
Committer: Pavan Kumar <pavanka@fb.com>
Committed: Wed Jul 9 17:08:48 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../giraph/comm/SendMessageToAllCache.java      | 308 -------------------
 .../giraph/comm/SendOneMessageToManyCache.java  | 306 ++++++++++++++++++
 .../java/org/apache/giraph/comm/ServerData.java |   2 +
 .../messages/AbstractListPerVertexStore.java    | 103 +++++++
 .../ByteArrayMessagesPerVertexStore.java        |  17 +-
 .../messages/InMemoryMessageStoreFactory.java   | 114 +++++--
 .../messages/MessageEncodeAndStoreType.java     |  59 ++++
 .../giraph/comm/messages/MessageStore.java      |  15 +
 .../comm/messages/OneMessagePerVertexStore.java |   5 +
 .../messages/PointerListMessagesIterable.java   | 105 +++++++
 .../messages/PointerListPerVertexStore.java     | 137 +++++++++
 .../comm/messages/SimpleMessageStore.java       |   4 +
 .../out_of_core/DiskBackedMessageStore.java     |   9 +
 .../primitives/IntByteArrayMessageStore.java    |   9 +
 .../primitives/IntFloatMessageStore.java        |   9 +
 .../primitives/LongByteArrayMessageStore.java   | 241 ---------------
 .../primitives/LongDoubleMessageStore.java      |   9 +
 .../long_id/LongAbstractListMessageStore.java   | 164 ++++++++++
 .../long_id/LongAbstractMessageStore.java       | 132 ++++++++
 .../long_id/LongByteArrayMessageStore.java      | 172 +++++++++++
 .../long_id/LongPointerListMessageStore.java    | 129 ++++++++
 .../primitives/long_id/package-info.java        |  22 ++
 .../NettyWorkerClientRequestProcessor.java      |   8 +-
 .../giraph/comm/netty/NettyWorkerServer.java    |   2 +-
 .../giraph/comm/requests/RequestType.java       |   6 +-
 .../SendWorkerOneMessageToManyRequest.java      | 156 ++++++++++
 .../SendWorkerOneToAllMessagesRequest.java      | 155 ----------
 .../apache/giraph/conf/GiraphConfiguration.java |  17 +-
 .../org/apache/giraph/conf/GiraphConstants.java |  20 +-
 .../utils/ByteArrayOneMessageToManyIds.java     | 105 +++++++
 .../giraph/utils/ByteArrayOneToAllMessages.java | 168 ----------
 .../utils/ByteStructVertexIdDataIterator.java   |   9 +
 .../ByteStructVertexIdMessageIterator.java      |  10 +
 .../utils/ExtendedByteArrayOutputBuffer.java    | 155 ++++++++++
 .../apache/giraph/utils/ExtendedDataOutput.java |   1 -
 .../utils/OneMessageToManyIdsIterator.java      | 143 +++++++++
 .../apache/giraph/utils/UnsafeArrayReads.java   |   2 +-
 .../org/apache/giraph/utils/UnsafeReads.java    |   2 +-
 .../utils/UnsafeReusableByteArrayInput.java     |  46 +++
 .../giraph/utils/VertexIdDataIterator.java      |   7 +
 .../giraph/utils/VertexIdMessageIterator.java   |  14 +
 .../org/apache/giraph/comm/RequestTest.java     |  14 +-
 .../TestLongDoublePrimitiveMessageStores.java   |   2 +-
 44 files changed, 2160 insertions(+), 955 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 13dfcd7..0263749 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-912: Support succinct representation of messages in messagestores (pavanka)
+
   GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka)
 
   GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
deleted file mode 100644
index 60858ea..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
-import org.apache.giraph.comm.requests.SendWorkerOneToAllMessagesRequest;
-import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayOneToAllMessages;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.PairList;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Aggregates the messages to be sent to workers so they can be sent
- * in bulk.  Not thread-safe.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class SendMessageToAllCache<I extends WritableComparable,
-    M extends Writable> extends SendMessageCache<I, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SendMessageToAllCache.class);
-  /** Cache serialized messages for each worker */
-  private final ByteArrayOneToAllMessages<I, M>[] oneToAllMsgCache;
-  /** Tracking one-to-all message sizes for each worker */
-  private final int[] oneToAllMsgSizes;
-  /** Reused byte array to serialize target ids on each worker */
-  private final ExtendedDataOutput[] idSerializer;
-  /** Reused int array to count target id distribution */
-  private final int[] idCounter;
-  /**
-   * Reused int array to record the partition id
-   * of the first target vertex id found on the worker.
-   */
-  private final int[] firstPartitionMap;
-  /** The WorkerInfo list */
-  private final WorkerInfo[] workerInfoList;
-
-  /**
-   * Constructor
-   *
-   * @param conf Giraph configuration
-   * @param serviceWorker Service worker
-   * @param processor NettyWorkerClientRequestProcessor
-   * @param maxMsgSize Max message size sent to a worker
-   */
-  public SendMessageToAllCache(ImmutableClassesGiraphConfiguration conf,
-      CentralizedServiceWorker<?, ?, ?> serviceWorker,
-      NettyWorkerClientRequestProcessor<I, ?, ?> processor,
-      int maxMsgSize) {
-    super(conf, serviceWorker, processor, maxMsgSize);
-    int numWorkers = getNumWorkers();
-    oneToAllMsgCache = new ByteArrayOneToAllMessages[numWorkers];
-    oneToAllMsgSizes = new int[numWorkers];
-    idSerializer = new ExtendedDataOutput[numWorkers];
-    // InitialBufferSizes is alo initialized based on the number of workers.
-    // As a result, initialBufferSizes is the same as idSerializer in length
-    int initialBufferSize = 0;
-    for (int i = 0; i < this.idSerializer.length; i++) {
-      initialBufferSize = getSendWorkerInitialBufferSize(i);
-      if (initialBufferSize > 0) {
-        // InitialBufferSizes is from super class.
-        // Each element is for one worker.
-        idSerializer[i] = conf.createExtendedDataOutput(initialBufferSize);
-      }
-    }
-    idCounter = new int[numWorkers];
-    firstPartitionMap = new int[numWorkers];
-    // Get worker info list.
-    workerInfoList = new WorkerInfo[numWorkers];
-    // Remember there could be null in the array.
-    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
-      workerInfoList[workerInfo.getTaskId()] = workerInfo;
-    }
-  }
-
-  /**
-   * Reset ExtendedDataOutput array for id serialization
-   * in next "one-to-all" message sending.
-   */
-  private void resetIdSerializers() {
-    for (int i = 0; i < this.idSerializer.length; i++) {
-      if (idSerializer[i] != null) {
-        idSerializer[i].reset();
-      }
-    }
-  }
-
-  /**
-   * Reset id counter for next "one-to-all" message sending.
-   */
-  private void resetIdCounter() {
-    Arrays.fill(idCounter, 0);
-  }
-
-  /**
-   * Add message with multiple ids to
-   * one-to-all message cache.
-   *
-   * @param workerInfo The remote worker destination
-   * @param ids A byte array to hold serialized vertex ids
-   * @param idPos The end position of ids
-   *              information in the byte array above
-   * @param count The number of target ids
-   * @param message Message to send to remote worker
-   * @return The size of messages for the worker.
-   */
-  private int addOneToAllMessage(
-    WorkerInfo workerInfo, byte[] ids, int idPos, int count, M message) {
-    // Get the data collection
-    ByteArrayOneToAllMessages<I, M> workerData =
-      oneToAllMsgCache[workerInfo.getTaskId()];
-    if (workerData == null) {
-      workerData = new ByteArrayOneToAllMessages<I, M>(
-        getConf().getOutgoingMessageValueFactory());
-      workerData.setConf(getConf());
-      workerData.initialize(getSendWorkerInitialBufferSize(
-        workerInfo.getTaskId()));
-      oneToAllMsgCache[workerInfo.getTaskId()] = workerData;
-    }
-    workerData.add(ids, idPos, count, message);
-    // Update the size of cached, outgoing data per worker
-    oneToAllMsgSizes[workerInfo.getTaskId()] =
-      workerData.getSize();
-    return oneToAllMsgSizes[workerInfo.getTaskId()];
-  }
-
-  /**
-   * Gets the one-to-all
-   * messages for a worker and removes it from the cache.
-   * Here the ByteArrayOneToAllMessages returned could be null.
-   * But when invoking this method, we also check if the data size sent
-   * to this worker is above the threshold. Therefore, it doesn't matter
-   * if the result is null or not.
-   *
-   * @param workerInfo The target worker where one-to-all messages
-   *                   go to.
-   * @return ByteArrayOneToAllMessages that belong to the workerInfo
-   */
-  private ByteArrayOneToAllMessages<I, M>
-  removeWorkerOneToAllMessages(WorkerInfo workerInfo) {
-    ByteArrayOneToAllMessages<I, M> workerData =
-      oneToAllMsgCache[workerInfo.getTaskId()];
-    if (workerData != null) {
-      oneToAllMsgCache[workerInfo.getTaskId()] = null;
-      oneToAllMsgSizes[workerInfo.getTaskId()] = 0;
-    }
-    return workerData;
-  }
-
-  /**
-   * Gets all the one-to-all
-   * messages and removes them from the cache.
-   *
-   * @return All vertex messages for all workers
-   */
-  private PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>>
-  removeAllOneToAllMessages() {
-    PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>> allData =
-      new PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>>();
-    allData.initialize(oneToAllMsgCache.length);
-    for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) {
-      ByteArrayOneToAllMessages<I, M> workerData =
-        removeWorkerOneToAllMessages(workerInfo);
-      if (workerData != null && !workerData.isEmpty()) {
-        allData.add(workerInfo, workerData);
-      }
-    }
-    return allData;
-  }
-
-  @Override
-  public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
-    // This is going to be reused through every message sending
-    resetIdSerializers();
-    resetIdCounter();
-    // Count messages
-    int currentMachineId = 0;
-    PartitionOwner owner = null;
-    WorkerInfo workerInfo = null;
-    I vertexId = null;
-    while (vertexIdIterator.hasNext()) {
-      vertexId = vertexIdIterator.next();
-      owner = getServiceWorker().getVertexPartitionOwner(vertexId);
-      workerInfo = owner.getWorkerInfo();
-      currentMachineId = workerInfo.getTaskId();
-      // Serialize this target vertex id
-      try {
-        vertexId.write(idSerializer[currentMachineId]);
-      } catch (IOException e) {
-        throw new IllegalStateException(
-          "Failed to serialize the target vertex id.");
-      }
-      idCounter[currentMachineId]++;
-      // Record the first partition id in the worker which message send to.
-      // If idCounter shows there is only one target on this worker
-      // then this is the partition number of the target vertex.
-      if (idCounter[currentMachineId] == 1) {
-        firstPartitionMap[currentMachineId] = owner.getPartitionId();
-      }
-    }
-    // Add the message to the cache
-    int idSerializerPos = 0;
-    int workerMessageSize = 0;
-    byte[] serializedId  = null;
-    WritableRequest writableRequest = null;
-    for (int i = 0; i < idCounter.length; i++) {
-      if (idCounter[i] == 1) {
-        serializedId = idSerializer[i].getByteArray();
-        idSerializerPos = idSerializer[i].getPos();
-        // Add the message to the cache
-        workerMessageSize = addMessage(workerInfoList[i],
-          firstPartitionMap[i], serializedId, idSerializerPos, message);
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("sendMessageToAllRequest: Send bytes (" +
-            message.toString() + ") to one target in  worker " +
-            workerInfoList[i]);
-        }
-        ++totalMsgsSentInSuperstep;
-        if (workerMessageSize >= maxMessagesSizePerWorker) {
-          PairList<Integer, VertexIdMessages<I, M>>
-            workerMessages = removeWorkerMessages(workerInfoList[i]);
-          writableRequest =
-            new SendWorkerMessagesRequest<I, M>(workerMessages);
-          totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
-          clientProcessor.doRequest(workerInfoList[i], writableRequest);
-          // Notify sending
-          getServiceWorker().getGraphTaskManager().notifySentMessages();
-        }
-      } else if (idCounter[i] > 1) {
-        serializedId = idSerializer[i].getByteArray();
-        idSerializerPos = idSerializer[i].getPos();
-        workerMessageSize = addOneToAllMessage(
-          workerInfoList[i], serializedId, idSerializerPos, idCounter[i],
-          message);
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("sendMessageToAllRequest: Send bytes (" +
-            message.toString() + ") to all targets in worker" +
-            workerInfoList[i]);
-        }
-        totalMsgsSentInSuperstep += idCounter[i];
-        if (workerMessageSize >= maxMessagesSizePerWorker) {
-          ByteArrayOneToAllMessages<I, M> workerOneToAllMessages =
-            removeWorkerOneToAllMessages(workerInfoList[i]);
-          writableRequest =
-            new SendWorkerOneToAllMessagesRequest<I, M>(
-              workerOneToAllMessages, getConf());
-          totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
-          clientProcessor.doRequest(workerInfoList[i], writableRequest);
-          // Notify sending
-          getServiceWorker().getGraphTaskManager().notifySentMessages();
-        }
-      }
-    }
-  }
-
-  @Override
-  public void flush() {
-    super.flush();
-    PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>>
-    remainingOneToAllMessageCache =
-      removeAllOneToAllMessages();
-    PairList<WorkerInfo,
-    ByteArrayOneToAllMessages<I, M>>.Iterator
-    oneToAllMsgIterator = remainingOneToAllMessageCache.getIterator();
-    while (oneToAllMsgIterator.hasNext()) {
-      oneToAllMsgIterator.next();
-      WritableRequest writableRequest =
-        new SendWorkerOneToAllMessagesRequest<I, M>(
-          oneToAllMsgIterator.getCurrentSecond(), getConf());
-      totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
-      clientProcessor.doRequest(
-        oneToAllMsgIterator.getCurrentFirst(), writableRequest);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
new file mode 100644
index 0000000..c67a20b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Aggregates the messages to be sent to workers so they can be sent
+ * in bulk.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+@NotThreadSafe
+@SuppressWarnings("unchecked")
+public class SendOneMessageToManyCache<I extends WritableComparable,
+  M extends Writable> extends SendMessageCache<I, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendOneMessageToManyCache.class);
+  /** Cache serialized one to many messages for each worker */
+  private final ByteArrayOneMessageToManyIds<I, M>[] msgVidsCache;
+  /** Tracking message-vertexIds sizes for each worker */
+  private final int[] msgVidsSizes;
+  /** Reused byte array to serialize target ids on each worker */
+  private final ExtendedDataOutput[] idSerializer;
+  /** Reused int array to count target id distribution */
+  private final int[] idCounter;
+  /**
+   * Reused int array to record the partition id
+   * of the first target vertex id found on the worker.
+   */
+  private final int[] firstPartitionMap;
+  /** The WorkerInfo list */
+  private final WorkerInfo[] workerInfoList;
+
+  /**
+   * Constructor
+   *
+   * @param conf Giraph configuration
+   * @param serviceWorker Service worker
+   * @param processor NettyWorkerClientRequestProcessor
+   * @param maxMsgSize Max message size sent to a worker
+   */
+  public SendOneMessageToManyCache(ImmutableClassesGiraphConfiguration conf,
+    CentralizedServiceWorker<?, ?, ?> serviceWorker,
+    NettyWorkerClientRequestProcessor<I, ?, ?> processor,
+    int maxMsgSize) {
+    super(conf, serviceWorker, processor, maxMsgSize);
+    int numWorkers = getNumWorkers();
+    msgVidsCache = new ByteArrayOneMessageToManyIds[numWorkers];
+    msgVidsSizes = new int[numWorkers];
+    idSerializer = new ExtendedDataOutput[numWorkers];
+    // InitialBufferSizes is alo initialized based on the number of workers.
+    // As a result, initialBufferSizes is the same as idSerializer in length
+    int initialBufferSize = 0;
+    for (int i = 0; i < this.idSerializer.length; i++) {
+      initialBufferSize = getSendWorkerInitialBufferSize(i);
+      if (initialBufferSize > 0) {
+        // InitialBufferSizes is from super class.
+        // Each element is for one worker.
+        idSerializer[i] = conf.createExtendedDataOutput(initialBufferSize);
+      }
+    }
+    idCounter = new int[numWorkers];
+    firstPartitionMap = new int[numWorkers];
+    // Get worker info list.
+    workerInfoList = new WorkerInfo[numWorkers];
+    // Remember there could be null in the array.
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      workerInfoList[workerInfo.getTaskId()] = workerInfo;
+    }
+  }
+
+  /**
+   * Reset ExtendedDataOutput array for id serialization
+   * in next message-Vids encoding
+   */
+  private void resetIdSerializers() {
+    for (int i = 0; i < this.idSerializer.length; i++) {
+      if (idSerializer[i] != null) {
+        idSerializer[i].reset();
+      }
+    }
+  }
+
+  /**
+   * Reset id counter for next message-vertexIds encoding
+   */
+  private void resetIdCounter() {
+    Arrays.fill(idCounter, 0);
+  }
+
+  /**
+   * Add message with multiple target ids to message cache.
+   *
+   * @param workerInfo The remote worker destination
+   * @param ids A byte array to hold serialized vertex ids
+   * @param idPos The end position of ids
+   *              information in the byte array above
+   * @param count The number of target ids
+   * @param message Message to send to remote worker
+   * @return The size of messages for the worker.
+   */
+  private int addOneToManyMessage(
+    WorkerInfo workerInfo, byte[] ids, int idPos, int count, M message) {
+    // Get the data collection
+    ByteArrayOneMessageToManyIds<I, M> workerData =
+      msgVidsCache[workerInfo.getTaskId()];
+    if (workerData == null) {
+      workerData = new ByteArrayOneMessageToManyIds<I, M>(
+        getConf().getOutgoingMessageValueFactory());
+      workerData.setConf(getConf());
+      workerData.initialize(getSendWorkerInitialBufferSize(
+        workerInfo.getTaskId()));
+      msgVidsCache[workerInfo.getTaskId()] = workerData;
+    }
+    workerData.add(ids, idPos, count, message);
+    // Update the size of cached, outgoing data per worker
+    msgVidsSizes[workerInfo.getTaskId()] =
+      workerData.getSize();
+    return msgVidsSizes[workerInfo.getTaskId()];
+  }
+
+  /**
+   * Gets the messages + vertexIds for a worker and removes it from the cache.
+   * Here the {@link org.apache.giraph.utils.ByteArrayOneMessageToManyIds}
+   * returned could be null.But when invoking this method, we also check if
+   * the data size sent to this worker is above the threshold.
+   * Therefore, it doesn't matter if the result is null or not.
+   *
+   * @param workerInfo Target worker to which one messages - many ids are sent
+   * @return {@link org.apache.giraph.utils.ByteArrayOneMessageToManyIds}
+   *         that belong to the workerInfo
+   */
+  private ByteArrayOneMessageToManyIds<I, M>
+  removeWorkerMsgVids(WorkerInfo workerInfo) {
+    ByteArrayOneMessageToManyIds<I, M> workerData =
+      msgVidsCache[workerInfo.getTaskId()];
+    if (workerData != null) {
+      msgVidsCache[workerInfo.getTaskId()] = null;
+      msgVidsSizes[workerInfo.getTaskId()] = 0;
+    }
+    return workerData;
+  }
+
+  /**
+   * Gets all messages - vertexIds and removes them from the cache.
+   *
+   * @return All vertex messages for all workers
+   */
+  private PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>
+  removeAllMsgVids() {
+    PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>> allData =
+      new PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>();
+    allData.initialize(msgVidsCache.length);
+    for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) {
+      ByteArrayOneMessageToManyIds<I, M> workerData =
+        removeWorkerMsgVids(workerInfo);
+      if (workerData != null && !workerData.isEmpty()) {
+        allData.add(workerInfo, workerData);
+      }
+    }
+    return allData;
+  }
+
+  @Override
+  public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
+    // This is going to be reused through every message sending
+    resetIdSerializers();
+    resetIdCounter();
+    // Count messages
+    int currentMachineId = 0;
+    PartitionOwner owner = null;
+    WorkerInfo workerInfo = null;
+    I vertexId = null;
+    while (vertexIdIterator.hasNext()) {
+      vertexId = vertexIdIterator.next();
+      owner = getServiceWorker().getVertexPartitionOwner(vertexId);
+      workerInfo = owner.getWorkerInfo();
+      currentMachineId = workerInfo.getTaskId();
+      // Serialize this target vertex id
+      try {
+        vertexId.write(idSerializer[currentMachineId]);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+          "Failed to serialize the target vertex id.");
+      }
+      idCounter[currentMachineId]++;
+      // Record the first partition id in the worker which message send to.
+      // If idCounter shows there is only one target on this worker
+      // then this is the partition number of the target vertex.
+      if (idCounter[currentMachineId] == 1) {
+        firstPartitionMap[currentMachineId] = owner.getPartitionId();
+      }
+    }
+    // Add the message to the cache
+    int idSerializerPos = 0;
+    int workerMessageSize = 0;
+    byte[] serializedId  = null;
+    WritableRequest writableRequest = null;
+    for (int i = 0; i < idCounter.length; i++) {
+      if (idCounter[i] == 1) {
+        serializedId = idSerializer[i].getByteArray();
+        idSerializerPos = idSerializer[i].getPos();
+        // Add the message to the cache
+        workerMessageSize = addMessage(workerInfoList[i],
+          firstPartitionMap[i], serializedId, idSerializerPos, message);
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("sendMessageToAllRequest: Send bytes (" +
+            message.toString() + ") to one target in  worker " +
+            workerInfoList[i]);
+        }
+        ++totalMsgsSentInSuperstep;
+        if (workerMessageSize >= maxMessagesSizePerWorker) {
+          PairList<Integer, VertexIdMessages<I, M>>
+            workerMessages = removeWorkerMessages(workerInfoList[i]);
+          writableRequest = new SendWorkerMessagesRequest<>(workerMessages);
+          totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
+          clientProcessor.doRequest(workerInfoList[i], writableRequest);
+          // Notify sending
+          getServiceWorker().getGraphTaskManager().notifySentMessages();
+        }
+      } else if (idCounter[i] > 1) {
+        serializedId = idSerializer[i].getByteArray();
+        idSerializerPos = idSerializer[i].getPos();
+        workerMessageSize = addOneToManyMessage(
+            workerInfoList[i], serializedId, idSerializerPos, idCounter[i],
+            message);
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("sendMessageToAllRequest: Send bytes (" +
+            message.toString() + ") to all targets in worker" +
+            workerInfoList[i]);
+        }
+        totalMsgsSentInSuperstep += idCounter[i];
+        if (workerMessageSize >= maxMessagesSizePerWorker) {
+          ByteArrayOneMessageToManyIds<I, M> workerMsgVids =
+            removeWorkerMsgVids(workerInfoList[i]);
+          writableRequest =  new SendWorkerOneMessageToManyRequest<>(
+            workerMsgVids, getConf());
+          totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
+          clientProcessor.doRequest(workerInfoList[i], writableRequest);
+          // Notify sending
+          getServiceWorker().getGraphTaskManager().notifySentMessages();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void flush() {
+    super.flush();
+    PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>
+    remainingMsgVidsCache = removeAllMsgVids();
+    PairList<WorkerInfo,
+        ByteArrayOneMessageToManyIds<I, M>>.Iterator
+    msgIdsIterator = remainingMsgVidsCache.getIterator();
+    while (msgIdsIterator.hasNext()) {
+      msgIdsIterator.next();
+      WritableRequest writableRequest =
+        new SendWorkerOneMessageToManyRequest<>(
+            msgIdsIterator.getCurrentSecond(), getConf());
+      totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
+      clientProcessor.doRequest(
+        msgIdsIterator.getCurrentFirst(), writableRequest);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index b3f8733..85bfe04 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -176,6 +176,8 @@ public class ServerData<I extends WritableComparable,
             messageStoreFactory.newStore(conf.getIncomingMessageValueFactory());
     incomingMessageStore =
         messageStoreFactory.newStore(conf.getOutgoingMessageValueFactory());
+    // finalize current message-store before resolving mutations
+    currentMessageStore.finalizeStore();
 
     currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
     incomingWorkerToWorkerMessages =

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
new file mode 100644
index 0000000..6840f86
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.VertexIdIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Abstract Implementation of {@link SimpleMessageStore} where
+ * multiple messages are stored per vertex as a list
+ * Used when there is no combiner provided.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ * @param <L> List type
+ */
+public abstract class AbstractListPerVertexStore<I extends WritableComparable,
+  M extends Writable, L extends List> extends SimpleMessageStore<I, M, L> {
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Message class held in the store
+   * @param service Service worker
+   * @param config Hadoop configuration
+   */
+  public AbstractListPerVertexStore(
+    MessageValueFactory<M> messageValueFactory,
+    CentralizedServiceWorker<I, ?, ?> service,
+    ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    super(messageValueFactory, service, config);
+  }
+
+  /**
+   * Create an instance of L
+   * @return instance of L
+   */
+  protected abstract L createList();
+
+  /**
+   * Get the list of pointers for a vertex
+   * Each pointer has information of how to access an encoded message
+   * for this vertex
+   *
+   * @param iterator vertex id iterator
+   * @return pointer list
+   */
+  protected L getOrCreateList(VertexIdIterator<I> iterator) {
+    PartitionOwner owner =
+        service.getVertexPartitionOwner(iterator.getCurrentVertexId());
+    int partitionId = owner.getPartitionId();
+    ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
+    L list = partitionMap.get(iterator.getCurrentVertexId());
+    if (list == null) {
+      L newList = createList();
+      list = partitionMap.putIfAbsent(
+          iterator.releaseCurrentVertexId(), newList);
+      if (list == null) {
+        list = newList;
+      }
+    }
+    return list;
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    ConcurrentMap<I, L> partitionMap =
+        map.get(getPartitionId(vertexId));
+    if (partitionMap == null) {
+      return Collections.<M>emptyList();
+    }
+    L list = partitionMap.get(vertexId);
+    return list == null ? Collections.<M>emptyList() :
+        getMessagesAsIterable(list);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 65939bb..57d255f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -62,6 +62,11 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     super(messageValueFactory, service, config);
   }
 
+  @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+
   /**
    * Get the extended data output for a vertex id from the iterator, creating
    * if necessary.  This method will take ownership of the vertex id from the
@@ -89,12 +94,10 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
   @Override
   public void addPartitionMessages(
-      int partitionId,
-      VertexIdMessages<I, M> messages) throws IOException {
+    int partitionId, VertexIdMessages<I, M> messages) throws IOException {
     ConcurrentMap<I, DataInputOutput> partitionMap =
         getOrCreatePartitionMap(partitionId);
-    VertexIdMessageBytesIterator<I, M>
-        vertexIdMessageBytesIterator =
+    VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
         messages.getVertexIdMessageBytesIterator();
     // Try to copy the message buffer over rather than
     // doing a deserialization of a message just to know its size.  This
@@ -113,8 +116,8 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
         }
       }
     } else {
-      VertexIdMessageIterator<I, M>
-          vertexIdMessageIterator = messages.getVertexIdMessageIterator();
+      VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+          messages.getVertexIdMessageIterator();
       while (vertexIdMessageIterator.hasNext()) {
         vertexIdMessageIterator.next();
         DataInputOutput dataInputOutput =
@@ -188,7 +191,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    * @param <M> Message data
    */
   private static class Factory<I extends WritableComparable, M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
+    implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
     private CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index f691d3e..db22503 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -22,8 +22,10 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
-import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.hadoop.io.DoubleWritable;
@@ -43,6 +45,7 @@ import org.apache.log4j.Logger;
  * @param <I> Vertex id
  * @param <M> Message data
  */
+@SuppressWarnings("unchecked")
 public class InMemoryMessageStoreFactory<I extends WritableComparable,
     M extends Writable>
     implements MessageStoreFactory<I, M, MessageStore<I, M>> {
@@ -51,9 +54,9 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
       Logger.getLogger(InMemoryMessageStoreFactory.class);
 
   /** Service worker */
-  private CentralizedServiceWorker<I, ?, ?> service;
+  protected CentralizedServiceWorker<I, ?, ?> service;
   /** Hadoop configuration */
-  private ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
+  protected ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
 
   /**
    * Default constructor allowing class invocation via Reflection.
@@ -61,46 +64,89 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   public InMemoryMessageStoreFactory() {
   }
 
-  @Override
-  public MessageStore<I, M> newStore(
-      MessageValueFactory<M> messageValueFactory) {
+  /**
+   * MessageStore to be used when combiner is enabled
+   *
+   * @param messageValueFactory message value factory
+   * @return message store
+   */
+  protected MessageStore<I, M> newStoreWithCombiner(
+    MessageValueFactory<M> messageValueFactory) {
     Class<M> messageClass = messageValueFactory.getValueClass();
     MessageStore messageStore;
-    if (conf.useMessageCombiner()) {
-      Class<I> vertexIdClass = conf.getVertexIdClass();
-      if (vertexIdClass.equals(IntWritable.class) &&
-          messageClass.equals(FloatWritable.class)) {
-        messageStore = new IntFloatMessageStore(
-            (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
-            (MessageCombiner<IntWritable, FloatWritable>)
-                conf.<FloatWritable>createMessageCombiner());
-      } else if (vertexIdClass.equals(LongWritable.class) &&
-          messageClass.equals(DoubleWritable.class)) {
-        messageStore = new LongDoubleMessageStore(
+    Class<I> vertexIdClass = conf.getVertexIdClass();
+    if (vertexIdClass.equals(IntWritable.class) &&
+        messageClass.equals(FloatWritable.class)) {
+      messageStore = new IntFloatMessageStore(
+          (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
+          (MessageCombiner<IntWritable, FloatWritable>)
+              conf.<FloatWritable>createMessageCombiner());
+    } else if (vertexIdClass.equals(LongWritable.class) &&
+        messageClass.equals(DoubleWritable.class)) {
+      messageStore = new LongDoubleMessageStore(
           (CentralizedServiceWorker<LongWritable, Writable, Writable>) service,
           (MessageCombiner<LongWritable, DoubleWritable>)
               conf.<DoubleWritable>createMessageCombiner());
-      } else {
-        messageStore = new OneMessagePerVertexStore<I, M>(messageValueFactory,
+    } else {
+      messageStore = new OneMessagePerVertexStore(messageValueFactory,
           service, conf.<M>createMessageCombiner(), conf);
+    }
+    return messageStore;
+  }
+
+  /**
+   * MessageStore to be used when combiner is not enabled
+   *
+   * @param messageValueFactory message value factory
+   * @return message store
+   */
+  protected MessageStore<I, M> newStoreWithoutCombiner(
+    MessageValueFactory<M> messageValueFactory) {
+    MessageStore messageStore = null;
+    MessageEncodeAndStoreType encodeAndStore = GiraphConstants
+        .MESSAGE_ENCODE_AND_STORE_TYPE.get(conf);
+    Class<I> vertexIdClass = conf.getVertexIdClass();
+    if (vertexIdClass.equals(IntWritable.class)) { // INT
+      messageStore = new IntByteArrayMessageStore(messageValueFactory,
+          service, conf);
+    } else if (vertexIdClass.equals(LongWritable.class)) { // LONG
+      if (encodeAndStore.equals(
+          MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
+          encodeAndStore.equals(
+            MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
+        messageStore = new LongByteArrayMessageStore(messageValueFactory,
+            service, conf);
+      } else if (encodeAndStore.equals(
+          MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
+        messageStore = new LongPointerListMessageStore(messageValueFactory,
+            service, conf);
       }
-    } else {
-      Class<I> vertexIdClass = conf.getVertexIdClass();
-      if (vertexIdClass.equals(IntWritable.class)) {
-        messageStore = new IntByteArrayMessageStore<M>(messageValueFactory,
-          (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
-          (ImmutableClassesGiraphConfiguration<IntWritable, Writable, Writable>)
-            conf);
-      } else if (vertexIdClass.equals(LongWritable.class)) {
-        messageStore = new LongByteArrayMessageStore<M>(messageValueFactory,
-          (CentralizedServiceWorker<LongWritable, Writable, Writable>) service,
-          (ImmutableClassesGiraphConfiguration<LongWritable, Writable,
-           Writable>) conf);
-      } else {
-        messageStore = new ByteArrayMessagesPerVertexStore<I, M>(
-          messageValueFactory, service, conf);
+    } else { // GENERAL
+      if (encodeAndStore.equals(
+          MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
+          encodeAndStore.equals(
+              MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
+        messageStore = new ByteArrayMessagesPerVertexStore<>(
+            messageValueFactory, service, conf);
+      } else if (encodeAndStore.equals(
+          MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
+        messageStore = new PointerListPerVertexStore(messageValueFactory,
+            service, conf);
       }
     }
+    return messageStore;
+  }
+
+  @Override
+  public MessageStore<I, M> newStore(
+      MessageValueFactory<M> messageValueFactory) {
+    Class<M> messageClass = messageValueFactory.getValueClass();
+    MessageStore messageStore;
+    if (conf.useMessageCombiner()) {
+      messageStore = newStoreWithCombiner(messageValueFactory);
+    } else {
+      messageStore = newStoreWithoutCombiner(messageValueFactory);
+    }
 
     if (LOG.isInfoEnabled()) {
       LOG.info("newStore: Created " + messageStore.getClass() +

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java
new file mode 100644
index 0000000..7a5b702
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+/**
+ * There are two types of message-stores currently
+ * pointer based, and default byte-array based
+ */
+public enum MessageEncodeAndStoreType {
+  /**
+   * Use message-store which is based on list of pointers to encoded messages
+   */
+  POINTER_LIST_PER_VERTEX(true),
+  /**
+   * Extract a byte array per partition from one message to many ids encoding
+   * and then store
+   */
+  EXTRACT_BYTEARRAY_PER_PARTITION(true),
+  /**
+   * Use a byte-array to store messages for each partition
+   */
+  BYTEARRAY_PER_PARTITION(false);
+
+  /** Can use one message to many ids encoding? */
+  private final boolean oneMessageToManyIdsEncoding;
+
+  /**
+   * Constructor
+   *
+   * @param oneMessageToManyIdsEncoding use one message to many ids encoding
+   */
+  MessageEncodeAndStoreType(boolean oneMessageToManyIdsEncoding) {
+    this.oneMessageToManyIdsEncoding = oneMessageToManyIdsEncoding;
+  }
+
+  /**
+   * True if one message to many ids encoding is set
+   * @return return oneMessageToManyIdsEncoding
+   */
+  public boolean useOneMessageToManyIdsEncoding() {
+    return oneMessageToManyIdsEncoding;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 7d0bbc6..6f1179a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -34,6 +34,14 @@ import org.apache.hadoop.io.WritableComparable;
 public interface MessageStore<I extends WritableComparable,
     M extends Writable> {
   /**
+   * True if this message-store encodes messages as a list of long pointers
+   * to compact serialized messages
+   *
+   * @return true if we encode messages as a list of pointers
+   */
+  boolean isPointerListEncoding();
+
+  /**
    * Gets messages for a vertex.  The lifetime of every message is only
    * guaranteed until the iterator's next() method is called. Do not hold
    * references to objects returned by this iterator.
@@ -79,6 +87,13 @@ public interface MessageStore<I extends WritableComparable,
     throws IOException;
 
   /**
+   * Called before start of computation in bspworker
+   * Since it is run from a single thread while the store is not being
+   * accessed by any other thread - this is ensured to be thread-safe
+   */
+  void finalizeStore();
+
+  /**
    * Gets vertex ids from selected partition which we have messages for
    *
    * @param partitionId Id of partition

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 9bede06..d3942d4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -63,6 +63,11 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
   }
 
   @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+
+  @Override
   public void addPartitionMessages(
       int partitionId,
       VertexIdMessages<I, M> messages) throws IOException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java
new file mode 100644
index 0000000..e5a1691
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongListIterator;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Create an iterable for messages based on a pointer list
+ *
+ * @param <M> messageType
+ */
+public class PointerListMessagesIterable<M extends Writable>
+  implements Iterable<M> {
+  /** Message class */
+  private final MessageValueFactory<M> messageValueFactory;
+  /** List of pointers to messages in byte array */
+  private final LongArrayList pointers;
+  /** Holds the byte arrays of serialized messages */
+  private final ExtendedByteArrayOutputBuffer msgBuffer;
+  /** Reader to read data from byte buffer */
+  private final UnsafeReusableByteArrayInput messageReader;
+
+  /**
+   *
+   * @param messageValueFactory message value factory
+   * @param pointers pointers to messages in buffer
+   * @param msgBuffer holds the byte arrays of serialized messages
+   */
+  public PointerListMessagesIterable(MessageValueFactory<M> messageValueFactory,
+    LongArrayList pointers, ExtendedByteArrayOutputBuffer msgBuffer) {
+    this.messageValueFactory = messageValueFactory;
+    this.pointers = pointers;
+    this.msgBuffer = msgBuffer;
+    // TODO - if needed implement same for Safe as well
+    messageReader = new UnsafeReusableByteArrayInput();
+  }
+
+  /**
+   * Create message from factory
+   *
+   * @return message instance
+   */
+  protected M createMessage() {
+    return messageValueFactory.newInstance();
+  }
+
+  @Override
+  public Iterator<M> iterator() {
+    return new Iterator<M>() {
+      private final LongListIterator iterator = pointers.iterator();
+      private final M reusableMsg =
+        PointerListMessagesIterable.this.createMessage();
+      @Override
+      public boolean hasNext() {
+        return iterator.hasNext();
+      }
+
+      @Override
+      public M next() {
+        long pointer = iterator.next();
+        try {
+          int index = (int) (pointer >>> 32);
+          int offset = (int) pointer;
+          ExtendedDataOutput buffer = msgBuffer.getDataOutput(index);
+          messageReader.initialize(buffer.getByteArray(), offset,
+            buffer.getPos());
+          reusableMsg.readFields(messageReader);
+        } catch (IOException e) {
+          throw new IllegalStateException("Got exception : " + e);
+        }
+        return reusableMsg;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
new file mode 100644
index 0000000..cce0439
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
+
+/**
+ * Implementation of {@link SimpleMessageStore} where multiple messages are
+ * stored as a list of long pointers to extended data output objects
+ * Used when there is no combiner provided.
+ *
+ * @param <I> vertexId type
+ * @param <M> message type
+ */
+public class PointerListPerVertexStore<I extends WritableComparable,
+  M extends Writable> extends AbstractListPerVertexStore<I, M, LongArrayList> {
+
+  /** Buffers of byte array outputs used to store messages - thread safe */
+  private final ExtendedByteArrayOutputBuffer bytesBuffer;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Message class held in the store
+   * @param service Service worker
+   * @param config Hadoop configuration
+   */
+  public PointerListPerVertexStore(
+      MessageValueFactory<M> messageValueFactory,
+      CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    super(messageValueFactory, service, config);
+    bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return true;
+  }
+
+  @Override
+  protected LongArrayList createList() {
+    return new LongArrayList();
+  }
+
+  @Override
+  public void addPartitionMessages(
+    int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+    VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+        messages.getVertexIdMessageIterator();
+    long pointer = 0;
+    LongArrayList list;
+    while (vertexIdMessageIterator.hasNext()) {
+      vertexIdMessageIterator.next();
+      M msg = vertexIdMessageIterator.getCurrentMessage();
+      list = getOrCreateList(vertexIdMessageIterator);
+      if (vertexIdMessageIterator.isNewMessage()) {
+        IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+        pointer = indexAndDataOut.getIndex();
+        pointer <<= 32;
+        ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+        pointer += dataOutput.getPos();
+        msg.write(dataOutput);
+      }
+      synchronized (list) {
+        list.add(pointer);
+      }
+    }
+  }
+
+  /**
+   * Get messages as an iterable from message storage
+   *
+   * @param pointers list of pointers to messages
+   * @return Messages as an iterable
+   */
+  @Override
+  public Iterable<M> getMessagesAsIterable(LongArrayList pointers) {
+    return new PointerListMessagesIterable<>(messageValueFactory, pointers,
+      bytesBuffer);
+  }
+
+  @Override
+  protected int getNumberOfMessagesIn(ConcurrentMap<I,
+    LongArrayList> partitionMap) {
+    int numberOfMessages = 0;
+    for (LongArrayList list : partitionMap.values()) {
+      numberOfMessages += list.size();
+    }
+    return numberOfMessages;
+  }
+
+  // FIXME -- complete these for check-pointing
+  @Override
+  protected void writeMessages(LongArrayList messages, DataOutput out)
+    throws IOException {
+
+  }
+
+  @Override
+  protected LongArrayList readFieldsForMessages(DataInput in)
+    throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 13292a2..37b8c05 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -139,6 +139,10 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
+  public void finalizeStore() {
+  }
+
+  @Override
   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
     ConcurrentMap<I, ?> partitionMap = map.get(partitionId);
     return (partitionMap == null) ? Collections.<I>emptyList() :

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 18b7798..3000cd4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -83,6 +83,11 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   }
 
   @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+
+  @Override
   public void addPartitionMessages(
       int partitionId,
       VertexIdMessages<I, M> messages) throws IOException {
@@ -106,6 +111,10 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   }
 
   @Override
+  public void finalizeStore() {
+  }
+
+  @Override
   public Iterable<M> getVertexMessages(I vertexId) throws IOException {
     if (hasMessagesForVertex(vertexId)) {
       return getMessageStore(vertexId).getVertexMessages(vertexId);

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index dbc1ce8..0012bf0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -94,6 +94,11 @@ public class IntByteArrayMessageStore<M extends Writable>
     }
   }
 
+  @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+
   /**
    * Get map which holds messages for partition which vertex belongs to.
    *
@@ -161,6 +166,10 @@ public class IntByteArrayMessageStore<M extends Writable>
   }
 
   @Override
+  public void finalizeStore() {
+  }
+
+  @Override
   public void clearPartition(int partitionId) throws IOException {
     map.get(partitionId).clear();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index be75ee8..97086e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -86,6 +86,11 @@ public class IntFloatMessageStore
     }
   }
 
+  @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+
   /**
    * Get map which holds messages for partition which vertex belongs to.
    *
@@ -126,6 +131,10 @@ public class IntFloatMessageStore
   }
 
   @Override
+  public void finalizeStore() {
+  }
+
+  @Override
   public void clearPartition(int partitionId) throws IOException {
     map.get(partitionId).clear();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
deleted file mode 100644
index 3110864..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.VertexIdMessageBytesIterator;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.VerboseByteStructMessageWrite;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Lists;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.LongIterator;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> Message type
- */
-public class LongByteArrayMessageStore<M extends Writable>
-    implements MessageStore<LongWritable, M> {
-  /** Message value factory */
-  protected final MessageValueFactory<M> messageValueFactory;
-  /** Map from partition id to map from vertex id to message */
-  private final
-  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>> map;
-  /** Service worker */
-  private final CentralizedServiceWorker<LongWritable, ?, ?> service;
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?> config;
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service      Service worker
-   * @param config       Hadoop configuration
-   */
-  public LongByteArrayMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
-        config) {
-    this.messageValueFactory = messageValueFactory;
-    this.service = service;
-    this.config = config;
-
-    map =
-        new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Partition<LongWritable, Writable, Writable> partition =
-          service.getPartitionStore().getOrCreatePartition(partitionId);
-      Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-          new Long2ObjectOpenHashMap<DataInputOutput>(
-              (int) partition.getVertexCount());
-      map.put(partitionId, partitionMap);
-      service.getPartitionStore().putPartition(partition);
-    }
-  }
-
-  /**
-   * Get map which holds messages for partition which vertex belongs to.
-   *
-   * @param vertexId Id of the vertex
-   * @return Map which holds messages for partition which vertex belongs to.
-   */
-  private Long2ObjectOpenHashMap<DataInputOutput> getPartitionMap(
-      LongWritable vertexId) {
-    return map.get(service.getPartitionId(vertexId));
-  }
-
-  /**
-   * Get the DataInputOutput for a vertex id, creating if necessary.
-   *
-   * @param partitionMap Partition map to look in
-   * @param vertexId Id of the vertex
-   * @return DataInputOutput for this vertex id (created if necessary)
-   */
-  private DataInputOutput getDataInputOutput(
-      Long2ObjectOpenHashMap<DataInputOutput> partitionMap,
-      long vertexId) {
-    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
-    if (dataInputOutput == null) {
-      dataInputOutput = config.createMessagesInputOutput();
-      partitionMap.put(vertexId, dataInputOutput);
-    }
-    return dataInputOutput;
-  }
-
-  @Override
-  public void addPartitionMessages(int partitionId,
-      VertexIdMessages<LongWritable, M> messages) throws
-      IOException {
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        map.get(partitionId);
-    synchronized (partitionMap) {
-      VertexIdMessageBytesIterator<LongWritable, M>
-          vertexIdMessageBytesIterator =
-          messages.getVertexIdMessageBytesIterator();
-      // Try to copy the message buffer over rather than
-      // doing a deserialization of a message just to know its size.  This
-      // should be more efficient for complex objects where serialization is
-      // expensive.  If this type of iterator is not available, fall back to
-      // deserializing/serializing the messages
-      if (vertexIdMessageBytesIterator != null) {
-        while (vertexIdMessageBytesIterator.hasNext()) {
-          vertexIdMessageBytesIterator.next();
-          DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
-              vertexIdMessageBytesIterator.getCurrentVertexId().get());
-          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
-              dataInputOutput.getDataOutput());
-        }
-      } else {
-        VertexIdMessageIterator<LongWritable, M>
-            iterator = messages.getVertexIdMessageIterator();
-        while (iterator.hasNext()) {
-          iterator.next();
-          DataInputOutput dataInputOutput =  getDataInputOutput(partitionMap,
-              iterator.getCurrentVertexId().get());
-          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
-              dataInputOutput.getDataOutput());
-        }
-      }
-    }
-  }
-
-  @Override
-  public void clearPartition(int partitionId) throws IOException {
-    map.get(partitionId).clear();
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(LongWritable vertexId) {
-    return getPartitionMap(vertexId).containsKey(vertexId.get());
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(
-      LongWritable vertexId) throws IOException {
-    DataInputOutput dataInputOutput =
-        getPartitionMap(vertexId).get(vertexId.get());
-    if (dataInputOutput == null) {
-      return EmptyIterable.get();
-    } else {
-      return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
-    }
-  }
-
-  @Override
-  public void clearVertexMessages(LongWritable vertexId) throws IOException {
-    getPartitionMap(vertexId).remove(vertexId.get());
-  }
-
-  @Override
-  public void clearAll() throws IOException {
-    map.clear();
-  }
-
-  @Override
-  public Iterable<LongWritable> getPartitionDestinationVertices(
-      int partitionId) {
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        map.get(partitionId);
-    List<LongWritable> vertices =
-        Lists.newArrayListWithCapacity(partitionMap.size());
-    LongIterator iterator = partitionMap.keySet().iterator();
-    while (iterator.hasNext()) {
-      vertices.add(new LongWritable(iterator.nextLong()));
-    }
-    return vertices;
-  }
-
-  @Override
-  public void writePartition(DataOutput out,
-      int partitionId) throws IOException {
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        map.get(partitionId);
-    out.writeInt(partitionMap.size());
-    ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
-        partitionMap.long2ObjectEntrySet().fastIterator();
-    while (iterator.hasNext()) {
-      Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
-      out.writeLong(entry.getLongKey());
-      entry.getValue().write(out);
-    }
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in,
-      int partitionId) throws IOException {
-    int size = in.readInt();
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        new Long2ObjectOpenHashMap<DataInputOutput>(size);
-    while (size-- > 0) {
-      long vertexId = in.readLong();
-      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
-      dataInputOutput.readFields(in);
-      partitionMap.put(vertexId, dataInputOutput);
-    }
-    synchronized (map) {
-      map.put(partitionId, partitionMap);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 264e65a..b0452c1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -83,6 +83,11 @@ public class LongDoubleMessageStore
     }
   }
 
+  @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+
   /**
    * Get map which holds messages for partition which vertex belongs to.
    *
@@ -123,6 +128,10 @@ public class LongDoubleMessageStore
   }
 
   @Override
+  public void finalizeStore() {
+  }
+
+  @Override
   public void clearPartition(int partitionId) throws IOException {
     map.get(partitionId).clear();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
new file mode 100644
index 0000000..ae61de4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.VertexIdIterator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.util.List;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <L> list type
+ */
+public abstract class LongAbstractListMessageStore<M extends Writable,
+  L extends List> extends LongAbstractMessageStore<M, L> {
+  /**
+   * Map used to store messages for nascent vertices i.e., ones
+   * that did not exist at the start of current superstep but will get
+   * created because of sending message to a non-existent vertex id
+   */
+  private final
+  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param service             Service worker
+   * @param config              Hadoop configuration
+   */
+  public LongAbstractListMessageStore(
+      MessageValueFactory<M> messageValueFactory,
+      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
+      ImmutableClassesGiraphConfiguration<LongWritable,
+          Writable, Writable> config) {
+    super(messageValueFactory, service, config);
+    populateMap();
+
+    // create map for vertex ids (i.e., nascent vertices) not known yet
+    nascentMap = new Int2ObjectOpenHashMap<>();
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
+    }
+  }
+
+  /**
+   * Populate the map with all vertexIds for each partition
+   */
+  private void populateMap() { // TODO - can parallelize?
+    // populate with vertex ids already known
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      Partition<LongWritable, ?, ?> partition = service.getPartitionStore()
+          .getOrCreatePartition(partitionId);
+      Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+      for (Vertex<LongWritable, ?, ?> vertex : partition) {
+        partitionMap.put(vertex.getId().get(), createList());
+      }
+    }
+  }
+
+  /**
+   * Create an instance of L
+   * @return instance of L
+   */
+  protected abstract L createList();
+
+  /**
+   * Get list for the current vertexId
+   *
+   * @param iterator vertexId iterator
+   * @return list for current vertexId
+   */
+  protected L getList(
+    VertexIdIterator<LongWritable> iterator) {
+    PartitionOwner owner =
+        service.getVertexPartitionOwner(iterator.getCurrentVertexId());
+    long vertexId = iterator.getCurrentVertexId().get();
+    int partitionId = owner.getPartitionId();
+    Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+    if (!partitionMap.containsKey(vertexId)) {
+      synchronized (nascentMap) {
+        // assumption: not many nascent vertices are created
+        // so overall synchronization is negligible
+        Long2ObjectOpenHashMap<L> nascentPartitionMap =
+          nascentMap.get(partitionId);
+        if (nascentPartitionMap.get(vertexId) == null) {
+          nascentPartitionMap.put(vertexId, createList());
+        }
+        return nascentPartitionMap.get(vertexId);
+      }
+    }
+    return partitionMap.get(vertexId);
+  }
+
+  @Override
+  public void finalizeStore() {
+    for (int partitionId : nascentMap.keySet()) {
+      // nascent vertices are present only in nascent map
+      map.get(partitionId).putAll(nascentMap.get(partitionId));
+    }
+    nascentMap.clear();
+  }
+
+  // TODO - discussion
+  /*
+  some approaches for ensuring correctness with parallel inserts
+  - current approach: uses a small extra bit of memory by pre-populating
+  map & pushes everything map cannot handle to nascentMap
+  at the beginning of next superstep compute a single threaded finalizeStore is
+  called (so little extra memory + 1 sequential finish ops)
+  - used striped parallel fast utils instead (unsure of perf)
+  - use concurrent map (every get gets far slower)
+  - use reader writer locks (unsure of perf)
+  (code looks something like underneath)
+
+      private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+      rwl.readLock().lock();
+      L list = partitionMap.get(vertexId);
+      if (list == null) {
+        rwl.readLock().unlock();
+        rwl.writeLock().lock();
+        if (partitionMap.get(vertexId) == null) {
+          list = createList();
+          partitionMap.put(vertexId, list);
+        }
+        rwl.readLock().lock();
+        rwl.writeLock().unlock();
+      }
+      rwl.readLock().unlock();
+  - adopted from the article
+    http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
+    ReentrantReadWriteLock.html
+   */
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
new file mode 100644
index 0000000..9ee090e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import com.google.common.collect.Lists;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.partition.Partition;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <T> datastructure used to hold messages
+ */
+public abstract class LongAbstractMessageStore<M extends Writable, T>
+  implements MessageStore<LongWritable, M> {
+  /** Message value factory */
+  protected final MessageValueFactory<M> messageValueFactory;
+  /** Map from partition id to map from vertex id to message */
+  protected final
+  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
+  /** Service worker */
+  protected final CentralizedServiceWorker<LongWritable, ?, ?> service;
+  /** Giraph configuration */
+  protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
+  config;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param service      Service worker
+   * @param config       Hadoop configuration
+   */
+  public LongAbstractMessageStore(
+      MessageValueFactory<M> messageValueFactory,
+      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
+      ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
+          config) {
+    this.messageValueFactory = messageValueFactory;
+    this.service = service;
+    this.config = config;
+
+    map = new Int2ObjectOpenHashMap<>();
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      Partition<LongWritable, Writable, Writable> partition =
+          service.getPartitionStore().getOrCreatePartition(partitionId);
+      Long2ObjectOpenHashMap<T> partitionMap =
+          new Long2ObjectOpenHashMap<T>(
+              (int) partition.getVertexCount());
+      map.put(partitionId, partitionMap);
+      service.getPartitionStore().putPartition(partition);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  protected Long2ObjectOpenHashMap<T> getPartitionMap(
+      LongWritable vertexId) {
+    return map.get(service.getPartitionId(vertexId));
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(LongWritable vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId.get());
+  }
+
+  @Override
+  public void clearVertexMessages(LongWritable vertexId) throws IOException {
+    getPartitionMap(vertexId).remove(vertexId.get());
+  }
+
+
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<LongWritable> getPartitionDestinationVertices(
+      int partitionId) {
+    Long2ObjectOpenHashMap<T> partitionMap =
+        map.get(partitionId);
+    List<LongWritable> vertices =
+        Lists.newArrayListWithCapacity(partitionMap.size());
+    LongIterator iterator = partitionMap.keySet().iterator();
+    while (iterator.hasNext()) {
+      vertices.add(new LongWritable(iterator.nextLong()));
+    }
+    return vertices;
+  }
+
+}


Mime
View raw message