giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 49dbdf7
Date Mon, 08 Jul 2013 17:41:27 GMT
Updated Branches:
  refs/heads/trunk 01a46f9ba -> 49dbdf72e


GIRAPH-704: Specialized message stores (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/49dbdf72
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/49dbdf72
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/49dbdf72

Branch: refs/heads/trunk
Commit: 49dbdf72ee486c9f35992f7c95e2eae9a1c2efa1
Parents: 01a46f9
Author: Maja Kabiljo <majakabiljo@maja-mbp.local>
Authored: Mon Jul 8 10:35:02 2013 -0700
Committer: Maja Kabiljo <majakabiljo@maja-mbp.local>
Committed: Mon Jul 8 10:35:02 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../giraph/bsp/CentralizedServiceWorker.java    |   2 +-
 .../java/org/apache/giraph/comm/ServerData.java |   8 +-
 .../ByteArrayMessagesPerVertexStore.java        |  10 +-
 .../messages/InMemoryMessageStoreFactory.java   |  58 ++++-
 .../PartitionDiskBackedMessageStore.java        |  11 +-
 .../primitives/IntByteArrayMessageStore.java    | 233 ++++++++++++++++++
 .../primitives/IntFloatMessageStore.java        | 194 +++++++++++++++
 .../primitives/LongByteArrayMessageStore.java   | 234 +++++++++++++++++++
 .../primitives/LongDoubleMessageStore.java      | 194 +++++++++++++++
 .../comm/messages/primitives/package-info.java  |  22 ++
 .../giraph/utils/ByteArrayVertexIdData.java     |  10 +-
 .../org/apache/giraph/utils/WritableUtils.java  |  29 +++
 .../apache/giraph/worker/BspServiceWorker.java  |   9 +-
 .../apache/giraph/comm/RequestFailureTest.java  |   1 +
 .../org/apache/giraph/comm/RequestTest.java     |   1 +
 .../TestIntFloatPrimitiveMessageStores.java     | 171 ++++++++++++++
 .../TestLongDoublePrimitiveMessageStores.java   | 171 ++++++++++++++
 18 files changed, 1318 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 0028ff9..ae6e96d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-704: Specialized message stores (majakabiljo)
+
   GIRAPH-703: create an appropriate way to generate the options.html page using maven.
   (armax00 via claudio)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 4b0f985..29567c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -140,7 +140,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    * @param vertexId Vertex id
    * @return Partition id
    */
-  Integer getPartitionId(I vertexId);
+  int getPartitionId(I vertexId);
 
   /**
    * Whether a partition with given id exists on this worker.

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index affc260..6dd716a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -97,10 +97,6 @@ public class ServerData<I extends WritableComparable,
       Mapper<?, ?, ?, ?>.Context context) {
     this.conf = conf;
     this.messageStoreFactory = messageStoreFactory;
-    currentMessageStore =
-        messageStoreFactory.newStore(conf.getOutgoingMessageValueClass());
-    incomingMessageStore =
-        messageStoreFactory.newStore(conf.getIncomingMessageValueClass());
     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
       partitionStore =
           new DiskBackedPartitionStore<I, V, E>(conf, context);
@@ -158,7 +154,9 @@ public class ServerData<I extends WritableComparable,
             "Failed to clear previous message store");
       }
     }
-    currentMessageStore = incomingMessageStore;
+    currentMessageStore =
+        incomingMessageStore != null ? incomingMessageStore :
+            messageStoreFactory.newStore(conf.getIncomingMessageValueClass());
     incomingMessageStore =
         messageStoreFactory.newStore(conf.getOutgoingMessageValueClass());
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index fecd7ee..16001f3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -26,6 +26,7 @@ import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.RepresentativeByteArrayIterator;
 import org.apache.giraph.utils.VertexIdIterator;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -174,18 +175,13 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
   @Override
   protected void writeMessages(ExtendedDataOutput extendedDataOutput,
       DataOutput out) throws IOException {
-    out.writeInt(extendedDataOutput.getPos());
-    out.write(
-        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+    WritableUtils.writeExtendedDataOutput(extendedDataOutput, out);
   }
 
   @Override
   protected ExtendedDataOutput readFieldsForMessages(DataInput in) throws
       IOException {
-    int byteArraySize = in.readInt();
-    byte[] messages = new byte[byteArraySize];
-    in.readFully(messages);
-    return config.createExtendedDataOutput(messages, 0);
+    return WritableUtils.readExtendedDataOutput(in, config);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index ba8a005..966f29e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -19,7 +19,16 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
+import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -57,20 +66,49 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
 
   @Override
   public MessageStore<I, M> newStore(Class<M> messageClass) {
+    MessageStore messageStore = null;
     if (conf.useCombiner()) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("newStore: " +
-            "Using OneMessagePerVertexStore with " + conf.getCombinerClass());
-      }
-      return new OneMessagePerVertexStore<I, M>(
+      Class<I> vertexIdClass = conf.getVertexIdClass();
+      if (vertexIdClass.equals(IntWritable.class) &&
+          messageClass.equals(FloatWritable.class)) {
+        messageStore = new IntFloatMessageStore(
+            (CentralizedServiceWorker<IntWritable, ?, ?>) service,
+            (Combiner<IntWritable, FloatWritable>)
+                conf.<FloatWritable>createCombiner());
+      } else if (vertexIdClass.equals(LongWritable.class) &&
+          messageClass.equals(DoubleWritable.class)) {
+        messageStore = new LongDoubleMessageStore(
+            (CentralizedServiceWorker<LongWritable, ?, ?>) service,
+            (Combiner<LongWritable, DoubleWritable>)
+                conf.<DoubleWritable>createCombiner());
+      } else {
+        messageStore = new OneMessagePerVertexStore<I, M>(
           messageClass, service, conf.<M>createCombiner(), conf);
-    } else {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("newStore: " +
-            "Using ByteArrayMessagesPerVertexStore since there is no combiner");
       }
-      return new ByteArrayMessagesPerVertexStore<I, M>(
+    } else {
+      Class<I> vertexIdClass = conf.getVertexIdClass();
+      if (vertexIdClass.equals(IntWritable.class)) {
+        messageStore = new IntByteArrayMessageStore<M>(messageClass,
+            (CentralizedServiceWorker<IntWritable, ?, ?>) service,
+            (ImmutableClassesGiraphConfiguration<IntWritable, ?, ?>) conf);
+      } else if (vertexIdClass.equals(LongWritable.class)) {
+        messageStore = new LongByteArrayMessageStore<M>(messageClass,
+            (CentralizedServiceWorker<LongWritable, ?, ?>) service,
+            (ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>) conf);
+      } else {
+        messageStore = new ByteArrayMessagesPerVertexStore<I, M>(
           messageClass, service, conf);
+      }
     }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("newStore: Created " + messageStore.getClass() +
+          " for vertex id " + conf.getVertexIdClass() +
+          " and message value " + conf.getOutgoingMessageValueClass() + " and" +
+          (conf.useCombiner() ? " combiner " + conf.getCombinerClass() :
+              " no combiner"));
+    }
+    return (MessageStore<I, M>) messageStore;
+
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
index 4ae805a..0a96b05 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -38,6 +38,7 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -248,8 +249,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     out.writeInt(inMemoryMessages.size());
     for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
       entry.getKey().write(out);
-      out.writeInt(entry.getValue().getPos());
-      out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
+      WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
     }
 
     // write file stores
@@ -277,11 +277,8 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     for (int m = 0; m < mapSize; m++) {
       I vertexId = config.createVertexId();
       vertexId.readFields(in);
-      int messageBytes = in.readInt();
-      byte[] buf = new byte[messageBytes];
-      ExtendedDataOutput extendedDataOutput =
-          config.createExtendedDataOutput(buf, messageBytes);
-      inMemoryMessages.put(vertexId, extendedDataOutput);
+      inMemoryMessages.put(vertexId,
+          WritableUtils.readExtendedDataOutput(in, config));
     }
 
     // read file stores

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
new file mode 100644
index 0000000..ed11e33
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Lists;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Special message store to be used when ids are IntWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> Message type
+ */
+public class IntByteArrayMessageStore<M extends Writable>
+    implements MessageStore<IntWritable, M> {
+  /** Message class */
+  protected final Class<M> messageClass;
+  /** Map from partition id to map from vertex id to message */
+  private final
+  Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<ExtendedDataOutput>> map;
+  /** Service worker */
+  private final CentralizedServiceWorker<IntWritable, ?, ?> service;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config;
+
+  /**
+   * Constructor
+   *
+   * @param messageClass Message class held in the store
+   * @param service      Service worker
+   * @param config       Hadoop configuration
+   */
+  public IntByteArrayMessageStore(
+      Class<M> messageClass,
+      CentralizedServiceWorker<IntWritable, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config) {
+    this.messageClass = messageClass;
+    this.service = service;
+    this.config = config;
+
+    map =
+        new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<ExtendedDataOutput>>();
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      Partition<IntWritable, ?, ?> partition =
+          service.getPartitionStore().getPartition(partitionId);
+      Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+          new Int2ObjectOpenHashMap<ExtendedDataOutput>(
+              (int) partition.getVertexCount());
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  private Int2ObjectOpenHashMap<ExtendedDataOutput> getPartitionMap(
+      IntWritable vertexId) {
+    return map.get(service.getPartitionId(vertexId));
+  }
+
+  /**
+   * Get the extended data output for a vertex id, creating if necessary.
+   *
+   * @param partitionMap Partition map to look in
+   * @param vertexId     Id of the vertex
+   * @return Extended data output for this vertex id (created if necessary)
+   */
+  private ExtendedDataOutput getExtendedDataOutput(
+      Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap,
+      int vertexId) {
+    ExtendedDataOutput extendedDataOutput = partitionMap.get(vertexId);
+    if (extendedDataOutput == null) {
+      extendedDataOutput = config.createExtendedDataOutput();
+      partitionMap.put(vertexId, extendedDataOutput);
+    }
+    return extendedDataOutput;
+  }
+
+  @Override
+  public void addPartitionMessages(int partitionId,
+      ByteArrayVertexIdMessages<IntWritable, M> messages) throws
+      IOException {
+    Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+        map.get(partitionId);
+    synchronized (partitionMap) {
+      ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageBytesIterator
+          vertexIdMessageBytesIterator =
+          messages.getVertexIdMessageBytesIterator();
+      // Try to copy the message buffer over rather than
+      // doing a deserialization of a message just to know its size.  This
+      // should be more efficient for complex objects where serialization is
+      // expensive.  If this type of iterator is not available, fall back to
+      // deserializing/serializing the messages
+      if (vertexIdMessageBytesIterator != null) {
+        while (vertexIdMessageBytesIterator.hasNext()) {
+          vertexIdMessageBytesIterator.next();
+          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
+              getExtendedDataOutput(partitionMap,
+                  vertexIdMessageBytesIterator.getCurrentVertexId().get()));
+        }
+      } else {
+        ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageIterator
+            iterator = messages.getVertexIdMessageIterator();
+        while (iterator.hasNext()) {
+          iterator.next();
+          iterator.getCurrentMessage().write(
+              getExtendedDataOutput(partitionMap,
+                  iterator.getCurrentVertexId().get()));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(IntWritable vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId.get());
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(
+      IntWritable vertexId) throws IOException {
+    ExtendedDataOutput extendedDataOutput =
+        getPartitionMap(vertexId).get(vertexId.get());
+    if (extendedDataOutput == null) {
+      return EmptyIterable.get();
+    } else {
+      return new MessagesIterable<M>(config, messageClass,
+          extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(IntWritable vertexId) throws IOException {
+    getPartitionMap(vertexId).remove(vertexId.get());
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<IntWritable> getPartitionDestinationVertices(
+      int partitionId) {
+    Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+        map.get(partitionId);
+    List<IntWritable> vertices =
+        Lists.newArrayListWithCapacity(partitionMap.size());
+    IntIterator iterator = partitionMap.keySet().iterator();
+    while (iterator.hasNext()) {
+      vertices.add(new IntWritable(iterator.nextInt()));
+    }
+    return vertices;
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+        map.get(partitionId);
+    out.writeInt(partitionMap.size());
+    ObjectIterator<Int2ObjectMap.Entry<ExtendedDataOutput>> iterator =
+        partitionMap.int2ObjectEntrySet().fastIterator();
+    while (iterator.hasNext()) {
+      Int2ObjectMap.Entry<ExtendedDataOutput> entry = iterator.next();
+      out.writeInt(entry.getIntKey());
+      WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    int size = in.readInt();
+    Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+        new Int2ObjectOpenHashMap<ExtendedDataOutput>(size);
+    while (size-- > 0) {
+      int vertexId = in.readInt();
+      partitionMap.put(vertexId,
+          WritableUtils.readExtendedDataOutput(in, config));
+    }
+    synchronized (map) {
+      map.put(partitionId, partitionMap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
new file mode 100644
index 0000000..a193fb9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+
+import com.google.common.collect.Lists;
+
+import it.unimi.dsi.fastutil.ints.Int2FloatMap;
+import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Special message store to be used when ids are IntWritable and messages
+ * are FloatWritable and combiner is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ */
+public class IntFloatMessageStore
+    implements MessageStore<IntWritable, FloatWritable> {
+  /** Map from partition id to map from vertex id to message */
+  private final Int2ObjectOpenHashMap<Int2FloatOpenHashMap> map;
+  /** Message combiner */
+  private final Combiner<IntWritable, FloatWritable> combiner;
+  /** Service worker */
+  private final CentralizedServiceWorker<IntWritable, ?, ?> service;
+
+  /**
+   * Constructor
+   *
+   * @param service Service worker
+   * @param combiner Message combiner
+   */
+  public IntFloatMessageStore(
+      CentralizedServiceWorker<IntWritable, ?, ?> service,
+      Combiner<IntWritable, FloatWritable> combiner) {
+    this.service = service;
+    this.combiner = combiner;
+
+    map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      Partition<IntWritable, ?, ?> partition =
+          service.getPartitionStore().getPartition(partitionId);
+      Int2FloatOpenHashMap partitionMap =
+          new Int2FloatOpenHashMap((int) partition.getVertexCount());
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  private Int2FloatOpenHashMap getPartitionMap(IntWritable vertexId) {
+    return map.get(service.getPartitionId(vertexId));
+  }
+
+  @Override
+  public void addPartitionMessages(int partitionId,
+      ByteArrayVertexIdMessages<IntWritable, FloatWritable> messages) throws
+      IOException {
+    IntWritable reusableVertexId = new IntWritable();
+    FloatWritable reusableMessage = new FloatWritable();
+    FloatWritable reusableCurrentMessage = new FloatWritable();
+
+    Int2FloatOpenHashMap partitionMap = map.get(partitionId);
+    synchronized (partitionMap) {
+      ByteArrayVertexIdMessages<IntWritable,
+          FloatWritable>.VertexIdMessageIterator
+          iterator = messages.getVertexIdMessageIterator();
+      while (iterator.hasNext()) {
+        iterator.next();
+        int vertexId = iterator.getCurrentVertexId().get();
+        float message = iterator.getCurrentMessage().get();
+        if (partitionMap.containsKey(vertexId)) {
+          reusableVertexId.set(vertexId);
+          reusableMessage.set(message);
+          reusableCurrentMessage.set(partitionMap.get(vertexId));
+          combiner.combine(reusableVertexId, reusableCurrentMessage,
+              reusableMessage);
+          message = reusableCurrentMessage.get();
+        }
+        partitionMap.put(vertexId, message);
+      }
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(IntWritable vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId.get());
+  }
+
+  @Override
+  public Iterable<FloatWritable> getVertexMessages(
+      IntWritable vertexId) throws IOException {
+    Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
+    if (!partitionMap.containsKey(vertexId.get())) {
+      return EmptyIterable.get();
+    } else {
+      return Collections.singleton(
+          new FloatWritable(partitionMap.get(vertexId.get())));
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(IntWritable vertexId) throws IOException {
+    getPartitionMap(vertexId).remove(vertexId.get());
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<IntWritable> getPartitionDestinationVertices(
+      int partitionId) {
+    Int2FloatOpenHashMap partitionMap = map.get(partitionId);
+    List<IntWritable> vertices =
+        Lists.newArrayListWithCapacity(partitionMap.size());
+    IntIterator iterator = partitionMap.keySet().iterator();
+    while (iterator.hasNext()) {
+      vertices.add(new IntWritable(iterator.nextInt()));
+    }
+    return vertices;
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    Int2FloatOpenHashMap partitionMap = map.get(partitionId);
+    out.writeInt(partitionMap.size());
+    ObjectIterator<Int2FloatMap.Entry> iterator =
+        partitionMap.int2FloatEntrySet().fastIterator();
+    while (iterator.hasNext()) {
+      Int2FloatMap.Entry entry = iterator.next();
+      out.writeInt(entry.getIntKey());
+      out.writeFloat(entry.getFloatValue());
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    int size = in.readInt();
+    Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(size);
+    while (size-- > 0) {
+      int vertexId = in.readInt();
+      float message = in.readFloat();
+      partitionMap.put(vertexId, message);
+    }
+    synchronized (map) {
+      map.put(partitionId, partitionMap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
new file mode 100644
index 0000000..f25972e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Lists;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> Message type
+ */
+public class LongByteArrayMessageStore<M extends Writable>
+    implements MessageStore<LongWritable, M> {
+  /** Message class */
+  protected final Class<M> messageClass;
+  /** Map from partition id to map from vertex id to message */
+  private final
+  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<ExtendedDataOutput>> map;
+  /** Service worker */
+  private final CentralizedServiceWorker<LongWritable, ?, ?> service;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?> config;
+
+  /**
+   * Constructor
+   *
+   * @param messageClass Message class held in the store
+   * @param service      Service worker
+   * @param config       Hadoop configuration
+   */
+  public LongByteArrayMessageStore(
+      Class<M> messageClass,
+      CentralizedServiceWorker<LongWritable, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<LongWritable, ?, ?> config) {
+    this.messageClass = messageClass;
+    this.service = service;
+    this.config = config;
+
+    map =
+        new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<ExtendedDataOutput>>();
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      Partition<LongWritable, ?, ?> partition =
+          service.getPartitionStore().getPartition(partitionId);
+      Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+          new Long2ObjectOpenHashMap<ExtendedDataOutput>(
+              (int) partition.getVertexCount());
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  private Long2ObjectOpenHashMap<ExtendedDataOutput> getPartitionMap(
+      LongWritable vertexId) {
+    return map.get(service.getPartitionId(vertexId));
+  }
+
+  /**
+   * Get the extended data output for a vertex id, creating if necessary.
+   *
+   * @param partitionMap Partition map to look in
+   * @param vertexId Id of the vertex
+   * @return Extended data output for this vertex id (created if necessary)
+   */
+  private ExtendedDataOutput getExtendedDataOutput(
+      Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap,
+      long vertexId) {
+    ExtendedDataOutput extendedDataOutput = partitionMap.get(vertexId);
+    if (extendedDataOutput == null) {
+      extendedDataOutput = config.createExtendedDataOutput();
+      partitionMap.put(vertexId, extendedDataOutput);
+    }
+    return extendedDataOutput;
+  }
+
+  @Override
+  public void addPartitionMessages(int partitionId,
+      ByteArrayVertexIdMessages<LongWritable, M> messages) throws
+      IOException {
+    Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+        map.get(partitionId);
+    synchronized (partitionMap) {
+      ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageBytesIterator
+          vertexIdMessageBytesIterator =
+          messages.getVertexIdMessageBytesIterator();
+      // Try to copy the message buffer over rather than
+      // doing a deserialization of a message just to know its size.  This
+      // should be more efficient for complex objects where serialization is
+      // expensive.  If this type of iterator is not available, fall back to
+      // deserializing/serializing the messages
+      if (vertexIdMessageBytesIterator != null) {
+        while (vertexIdMessageBytesIterator.hasNext()) {
+          vertexIdMessageBytesIterator.next();
+          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
+              getExtendedDataOutput(partitionMap,
+                  vertexIdMessageBytesIterator.getCurrentVertexId().get()));
+        }
+      } else {
+        ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageIterator
+            iterator = messages.getVertexIdMessageIterator();
+        while (iterator.hasNext()) {
+          iterator.next();
+          iterator.getCurrentMessage().write(
+              getExtendedDataOutput(partitionMap,
+                  iterator.getCurrentVertexId().get()));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(LongWritable vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId.get());
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(
+      LongWritable vertexId) throws IOException {
+    ExtendedDataOutput extendedDataOutput =
+        getPartitionMap(vertexId).get(vertexId.get());
+    if (extendedDataOutput == null) {
+      return EmptyIterable.get();
+    } else {
+      return new MessagesIterable<M>(config, messageClass,
+          extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(LongWritable vertexId) throws IOException {
+    getPartitionMap(vertexId).remove(vertexId.get());
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<LongWritable> getPartitionDestinationVertices(
+      int partitionId) {
+    Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+        map.get(partitionId);
+    List<LongWritable> vertices =
+        Lists.newArrayListWithCapacity(partitionMap.size());
+    LongIterator iterator = partitionMap.keySet().iterator();
+    while (iterator.hasNext()) {
+      vertices.add(new LongWritable(iterator.nextLong()));
+    }
+    return vertices;
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+        map.get(partitionId);
+    out.writeInt(partitionMap.size());
+    ObjectIterator<Long2ObjectMap.Entry<ExtendedDataOutput>> iterator =
+        partitionMap.long2ObjectEntrySet().fastIterator();
+    while (iterator.hasNext()) {
+      Long2ObjectMap.Entry<ExtendedDataOutput> entry = iterator.next();
+      out.writeLong(entry.getLongKey());
+      WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    int size = in.readInt();
+    Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+        new Long2ObjectOpenHashMap<ExtendedDataOutput>(size);
+    while (size-- > 0) {
+      long vertexId = in.readLong();
+      partitionMap.put(vertexId,
+          WritableUtils.readExtendedDataOutput(in, config));
+    }
+    synchronized (map) {
+      map.put(partitionId, partitionMap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
new file mode 100644
index 0000000..96ed5b4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.Lists;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2DoubleMap;
+import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Special message store to be used when ids are LongWritable and messages
+ * are DoubleWritable and combiner is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ */
+public class LongDoubleMessageStore
+    implements MessageStore<LongWritable, DoubleWritable> {
+  /** Map from partition id to map from vertex id to message */
+  private final Int2ObjectOpenHashMap<Long2DoubleOpenHashMap> map;
+  /** Message combiner */
+  private final Combiner<LongWritable, DoubleWritable> combiner;
+  /** Service worker */
+  private final CentralizedServiceWorker<LongWritable, ?, ?> service;
+
+  /**
+   * Constructor
+   *
+   * @param service Service worker
+   * @param combiner Message combiner
+   */
+  public LongDoubleMessageStore(
+      CentralizedServiceWorker<LongWritable, ?, ?> service,
+      Combiner<LongWritable, DoubleWritable> combiner) {
+    this.service = service;
+    this.combiner = combiner;
+
+    map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      Partition<LongWritable, ?, ?> partition =
+          service.getPartitionStore().getPartition(partitionId);
+      Long2DoubleOpenHashMap partitionMap =
+          new Long2DoubleOpenHashMap((int) partition.getVertexCount());
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) {
+    return map.get(service.getPartitionId(vertexId));
+  }
+
+  @Override
+  public void addPartitionMessages(int partitionId,
+      ByteArrayVertexIdMessages<LongWritable, DoubleWritable> messages) throws
+      IOException {
+    LongWritable reusableVertexId = new LongWritable();
+    DoubleWritable reusableMessage = new DoubleWritable();
+    DoubleWritable reusableCurrentMessage = new DoubleWritable();
+
+    Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
+    synchronized (partitionMap) {
+      ByteArrayVertexIdMessages<LongWritable,
+          DoubleWritable>.VertexIdMessageIterator
+          iterator = messages.getVertexIdMessageIterator();
+      while (iterator.hasNext()) {
+        iterator.next();
+        long vertexId = iterator.getCurrentVertexId().get();
+        double message = iterator.getCurrentMessage().get();
+        if (partitionMap.containsKey(vertexId)) {
+          reusableVertexId.set(vertexId);
+          reusableMessage.set(message);
+          reusableCurrentMessage.set(partitionMap.get(vertexId));
+          combiner.combine(reusableVertexId, reusableCurrentMessage,
+              reusableMessage);
+          message = reusableCurrentMessage.get();
+        }
+        partitionMap.put(vertexId, message);
+      }
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(LongWritable vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId.get());
+  }
+
+  @Override
+  public Iterable<DoubleWritable> getVertexMessages(
+      LongWritable vertexId) throws IOException {
+    Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
+    if (!partitionMap.containsKey(vertexId.get())) {
+      return EmptyIterable.get();
+    } else {
+      return Collections.singleton(
+          new DoubleWritable(partitionMap.get(vertexId.get())));
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(LongWritable vertexId) throws IOException {
+    getPartitionMap(vertexId).remove(vertexId.get());
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<LongWritable> getPartitionDestinationVertices(
+      int partitionId) {
+    Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
+    List<LongWritable> vertices =
+        Lists.newArrayListWithCapacity(partitionMap.size());
+    LongIterator iterator = partitionMap.keySet().iterator();
+    while (iterator.hasNext()) {
+      vertices.add(new LongWritable(iterator.nextLong()));
+    }
+    return vertices;
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
+    out.writeInt(partitionMap.size());
+    ObjectIterator<Long2DoubleMap.Entry> iterator =
+        partitionMap.long2DoubleEntrySet().fastIterator();
+    while (iterator.hasNext()) {
+      Long2DoubleMap.Entry entry = iterator.next();
+      out.writeLong(entry.getLongKey());
+      out.writeDouble(entry.getDoubleValue());
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    int size = in.readInt();
+    Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(size);
+    while (size-- > 0) {
+      long vertexId = in.readLong();
+      double message = in.readDouble();
+      partitionMap.put(vertexId, message);
+    }
+    synchronized (map) {
+      map.put(partitionId, partitionMap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/package-info.java
new file mode 100644
index 0000000..1035110
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of message stores specialized for a certain type of vertex ids,
+ * messages and combiner.
+ */
+package org.apache.giraph.comm.messages.primitives;

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index 9b3f165..26c547b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -155,17 +155,13 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
 
   @Override
   public void write(DataOutput dataOutput) throws IOException {
-    dataOutput.writeInt(extendedDataOutput.getPos());
-    dataOutput.write(extendedDataOutput.getByteArray(), 0,
-        extendedDataOutput.getPos());
+    WritableUtils.writeExtendedDataOutput(extendedDataOutput, dataOutput);
   }
 
   @Override
   public void readFields(DataInput dataInput) throws IOException {
-    int size = dataInput.readInt();
-    byte[] buf = new byte[size];
-    dataInput.readFully(buf);
-    extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
+    extendedDataOutput =
+        WritableUtils.readExtendedDataOutput(dataInput, configuration);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index c78d717..695b08d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -335,6 +335,35 @@ public class WritableUtils {
   }
 
   /**
+   * Write ExtendedDataOutput to DataOutput
+   *
+   * @param extendedDataOutput ExtendedDataOutput to write
+   * @param out DataOutput to write to
+   */
+  public static void writeExtendedDataOutput(
+      ExtendedDataOutput extendedDataOutput, DataOutput out)
+    throws IOException {
+    out.writeInt(extendedDataOutput.getPos());
+    out.write(
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+  }
+
+  /**
+   * Read ExtendedDataOutput from DataInput
+   *
+   * @param in DataInput to read from
+   * @param conf Configuration
+   * @return ExtendedDataOutput read
+   */
+  public static ExtendedDataOutput readExtendedDataOutput(DataInput in,
+      ImmutableClassesGiraphConfiguration conf) throws IOException {
+    int size = in.readInt();
+    byte[] buf = new byte[size];
+    in.readFully(buf);
+    return conf.createExtendedDataOutput(buf, size);
+  }
+
+  /**
    * Write vertex data to byte array with the first 4 bytes as the size of the
    * entire buffer (including the size).
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 89b6f9e..52bac3f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -1226,10 +1226,8 @@ public class BspServiceWorker<I extends WritableComparable,
                 " on " + partitionsFile);
           }
           partition.readFields(partitionsStream);
-          if (partitionsStream.readBoolean()) {
-            getServerData().getCurrentMessageStore().readFieldsForPartition(
-                partitionsStream, partitionId);
-          }
+          getServerData().getIncomingMessageStore().readFieldsForPartition(
+              partitionsStream, partitionId);
           partitionsStream.close();
           if (LOG.isInfoEnabled()) {
             LOG.info("loadCheckpoint: Loaded partition " +
@@ -1274,6 +1272,7 @@ public class BspServiceWorker<I extends WritableComparable,
           e);
     }
 
+    getServerData().prepareSuperstep();
     // Communication service needs to setup the connections prior to
     // processing vertices
 /*if[HADOOP_NON_SECURE]
@@ -1492,7 +1491,7 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public Integer getPartitionId(I vertexId) {
+  public int getPartitionId(I vertexId) {
     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
     return partitionOwner.getPartitionId();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 35e6362..c2c8568 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -152,6 +152,7 @@ public class RequestFailureTest {
   private void checkSendingTwoRequests() throws IOException {
     // Start the service
     serverData = MockUtils.createNewServerData(conf, context);
+    serverData.prepareSuperstep();
     WorkerInfo workerInfo = new WorkerInfo();
     server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData), workerInfo,

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index c8c09df..d7664ef 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -85,6 +85,7 @@ public class RequestTest {
 
     // Start the service
     serverData = MockUtils.createNewServerData(conf, context);
+    serverData.prepareSuperstep();
     workerInfo = new WorkerInfo();
     server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData), workerInfo,

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
new file mode 100644
index 0000000..fd3a496
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.combiner.FloatSumCombiner;
+import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class TestIntFloatPrimitiveMessageStores {
+  private static final int NUM_PARTITIONS = 2;
+  private static CentralizedServiceWorker<IntWritable, ?, ?> service;
+
+  @Before
+  public void prepare() throws IOException {
+    service = Mockito.mock(CentralizedServiceWorker.class);
+    Mockito.when(
+        service.getPartitionId(Mockito.any(IntWritable.class))).thenAnswer(
+        new Answer<Integer>() {
+          @Override
+          public Integer answer(InvocationOnMock invocation) {
+            IntWritable vertexId = (IntWritable) invocation.getArguments()[0];
+            return vertexId.get() % NUM_PARTITIONS;
+          }
+        }
+    );
+    PartitionStore partitionStore = Mockito.mock(PartitionStore.class);
+    Mockito.when(service.getPartitionStore()).thenReturn(partitionStore);
+    Mockito.when(partitionStore.getPartitionIds()).thenReturn(
+        Lists.newArrayList(0, 1));
+    Partition partition = Mockito.mock(Partition.class);
+    Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
+    Mockito.when(partitionStore.getPartition(0)).thenReturn(partition);
+    Mockito.when(partitionStore.getPartition(1)).thenReturn(partition);
+  }
+
+  private static class IntFloatNoOpComputation extends
+      BasicComputation<IntWritable, NullWritable, NullWritable,
+          FloatWritable> {
+    @Override
+    public void compute(Vertex<IntWritable, NullWritable, NullWritable> vertex,
+        Iterable<FloatWritable> messages) throws IOException {
+    }
+  }
+
+  private static ImmutableClassesGiraphConfiguration<IntWritable, ?, ?>
+  createIntFloatConf() {
+    GiraphConfiguration initConf = new GiraphConfiguration();
+    initConf.setComputationClass(IntFloatNoOpComputation.class);
+    return new ImmutableClassesGiraphConfiguration(initConf);
+  }
+
+  private static ByteArrayVertexIdMessages<IntWritable, FloatWritable>
+  createIntFloatMessages() {
+    ByteArrayVertexIdMessages<IntWritable, FloatWritable> messages =
+        new ByteArrayVertexIdMessages<IntWritable, FloatWritable>(
+            FloatWritable.class);
+    messages.setConf(createIntFloatConf());
+    messages.initialize();
+    return messages;
+  }
+
+  private static void insertIntFloatMessages(
+      MessageStore<IntWritable, FloatWritable> messageStore) throws
+      IOException {
+    ByteArrayVertexIdMessages<IntWritable, FloatWritable> messages =
+        createIntFloatMessages();
+    messages.add(new IntWritable(0), new FloatWritable(1));
+    messages.add(new IntWritable(2), new FloatWritable(3));
+    messages.add(new IntWritable(0), new FloatWritable(4));
+    messageStore.addPartitionMessages(0, messages);
+    messages = createIntFloatMessages();
+    messages.add(new IntWritable(1), new FloatWritable(1));
+    messages.add(new IntWritable(1), new FloatWritable(3));
+    messages.add(new IntWritable(1), new FloatWritable(4));
+    messageStore.addPartitionMessages(1, messages);
+    messages = createIntFloatMessages();
+    messages.add(new IntWritable(0), new FloatWritable(5));
+    messageStore.addPartitionMessages(0, messages);
+  }
+
+  @Test
+  public void testIntFloatMessageStore() throws IOException {
+    IntFloatMessageStore messageStore =
+        new IntFloatMessageStore(service, new FloatSumCombiner());
+    insertIntFloatMessages(messageStore);
+
+    Iterable<FloatWritable> m0 =
+        messageStore.getVertexMessages(new IntWritable(0));
+    Assert.assertEquals(1, Iterables.size(m0));
+    Assert.assertEquals((float) 10.0, m0.iterator().next().get());
+    Iterable<FloatWritable> m1 =
+        messageStore.getVertexMessages(new IntWritable(1));
+    Assert.assertEquals(1, Iterables.size(m1));
+    Assert.assertEquals((float) 8.0, m1.iterator().next().get());
+    Iterable<FloatWritable> m2 =
+        messageStore.getVertexMessages(new IntWritable(2));
+    Assert.assertEquals(1, Iterables.size(m2));
+    Assert.assertEquals((float) 3.0, m2.iterator().next().get());
+    Assert.assertTrue(
+        Iterables.isEmpty(messageStore.getVertexMessages(new IntWritable(3))));
+  }
+
+  @Test
+  public void testIntByteArrayMessageStore() throws IOException {
+    IntByteArrayMessageStore<FloatWritable> messageStore =
+        new IntByteArrayMessageStore<FloatWritable>(FloatWritable.class,
+            service, createIntFloatConf());
+    insertIntFloatMessages(messageStore);
+
+    Iterable<FloatWritable> m0 =
+        messageStore.getVertexMessages(new IntWritable(0));
+    Assert.assertEquals(3, Iterables.size(m0));
+    Iterator<FloatWritable> i0 = m0.iterator();
+    Assert.assertEquals((float) 1.0, i0.next().get());
+    Assert.assertEquals((float) 4.0, i0.next().get());
+    Assert.assertEquals((float) 5.0, i0.next().get());
+    Iterable<FloatWritable> m1 =
+        messageStore.getVertexMessages(new IntWritable(1));
+    Assert.assertEquals(3, Iterables.size(m1));
+    Iterator<FloatWritable> i1 = m1.iterator();
+    Assert.assertEquals((float) 1.0, i1.next().get());
+    Assert.assertEquals((float) 3.0, i1.next().get());
+    Assert.assertEquals((float) 4.0, i1.next().get());
+    Iterable<FloatWritable> m2 =
+        messageStore.getVertexMessages(new IntWritable(2));
+    Assert.assertEquals(1, Iterables.size(m2));
+    Assert.assertEquals((float) 3.0, m2.iterator().next().get());
+    Assert.assertTrue(
+        Iterables.isEmpty(messageStore.getVertexMessages(new IntWritable(3))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/49dbdf72/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
new file mode 100644
index 0000000..5a69062
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.combiner.DoubleSumCombiner;
+import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class TestLongDoublePrimitiveMessageStores {
+  private static final int NUM_PARTITIONS = 2;
+  private static CentralizedServiceWorker<LongWritable, ?, ?> service;
+
+  @Before
+  public void prepare() throws IOException {
+    service = Mockito.mock(CentralizedServiceWorker.class);
+    Mockito.when(
+        service.getPartitionId(Mockito.any(LongWritable.class))).thenAnswer(
+        new Answer<Integer>() {
+          @Override
+          public Integer answer(InvocationOnMock invocation) {
+            LongWritable vertexId = (LongWritable) invocation.getArguments()[0];
+            return (int) (vertexId.get() % NUM_PARTITIONS);
+          }
+        }
+    );
+    PartitionStore partitionStore = Mockito.mock(PartitionStore.class);
+    Mockito.when(service.getPartitionStore()).thenReturn(partitionStore);
+    Mockito.when(partitionStore.getPartitionIds()).thenReturn(
+        Lists.newArrayList(0, 1));
+    Partition partition = Mockito.mock(Partition.class);
+    Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
+    Mockito.when(partitionStore.getPartition(0)).thenReturn(partition);
+    Mockito.when(partitionStore.getPartition(1)).thenReturn(partition);
+  }
+
+  private static class LongDoubleNoOpComputation extends
+      BasicComputation<LongWritable, NullWritable, NullWritable,
+          DoubleWritable> {
+    @Override
+    public void compute(Vertex<LongWritable, NullWritable, NullWritable> vertex,
+        Iterable<DoubleWritable> messages) throws IOException {
+    }
+  }
+
+  private static ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
+  createLongDoubleConf() {
+    GiraphConfiguration initConf = new GiraphConfiguration();
+    initConf.setComputationClass(LongDoubleNoOpComputation.class);
+    return new ImmutableClassesGiraphConfiguration(initConf);
+  }
+
+  private static ByteArrayVertexIdMessages<LongWritable, DoubleWritable>
+  createLongDoubleMessages() {
+    ByteArrayVertexIdMessages<LongWritable, DoubleWritable> messages =
+        new ByteArrayVertexIdMessages<LongWritable, DoubleWritable>(
+            DoubleWritable.class);
+    messages.setConf(createLongDoubleConf());
+    messages.initialize();
+    return messages;
+  }
+
+  private static void insertLongDoubleMessages(
+      MessageStore<LongWritable, DoubleWritable> messageStore) throws
+      IOException {
+    ByteArrayVertexIdMessages<LongWritable, DoubleWritable> messages =
+        createLongDoubleMessages();
+    messages.add(new LongWritable(0), new DoubleWritable(1));
+    messages.add(new LongWritable(2), new DoubleWritable(3));
+    messages.add(new LongWritable(0), new DoubleWritable(4));
+    messageStore.addPartitionMessages(0, messages);
+    messages = createLongDoubleMessages();
+    messages.add(new LongWritable(1), new DoubleWritable(1));
+    messages.add(new LongWritable(1), new DoubleWritable(3));
+    messages.add(new LongWritable(1), new DoubleWritable(4));
+    messageStore.addPartitionMessages(1, messages);
+    messages = createLongDoubleMessages();
+    messages.add(new LongWritable(0), new DoubleWritable(5));
+    messageStore.addPartitionMessages(0, messages);
+  }
+
+  @Test
+  public void testLongDoubleMessageStore() throws IOException {
+    LongDoubleMessageStore messageStore =
+        new LongDoubleMessageStore(service, new DoubleSumCombiner());
+    insertLongDoubleMessages(messageStore);
+
+    Iterable<DoubleWritable> m0 =
+        messageStore.getVertexMessages(new LongWritable(0));
+    Assert.assertEquals(1, Iterables.size(m0));
+    Assert.assertEquals(10.0, m0.iterator().next().get());
+    Iterable<DoubleWritable> m1 =
+        messageStore.getVertexMessages(new LongWritable(1));
+    Assert.assertEquals(1, Iterables.size(m1));
+    Assert.assertEquals(8.0, m1.iterator().next().get());
+    Iterable<DoubleWritable> m2 =
+        messageStore.getVertexMessages(new LongWritable(2));
+    Assert.assertEquals(1, Iterables.size(m2));
+    Assert.assertEquals(3.0, m2.iterator().next().get());
+    Assert.assertTrue(
+        Iterables.isEmpty(messageStore.getVertexMessages(new LongWritable(3))));
+  }
+
+  @Test
+  public void testLongByteArrayMessageStore() throws IOException {
+    LongByteArrayMessageStore<DoubleWritable> messageStore =
+        new LongByteArrayMessageStore<DoubleWritable>(DoubleWritable.class,
+            service, createLongDoubleConf());
+    insertLongDoubleMessages(messageStore);
+
+    Iterable<DoubleWritable> m0 =
+        messageStore.getVertexMessages(new LongWritable(0));
+    Assert.assertEquals(3, Iterables.size(m0));
+    Iterator<DoubleWritable> i0 = m0.iterator();
+    Assert.assertEquals(1.0, i0.next().get());
+    Assert.assertEquals(4.0, i0.next().get());
+    Assert.assertEquals(5.0, i0.next().get());
+    Iterable<DoubleWritable> m1 =
+        messageStore.getVertexMessages(new LongWritable(1));
+    Assert.assertEquals(3, Iterables.size(m1));
+    Iterator<DoubleWritable> i1 = m1.iterator();
+    Assert.assertEquals(1.0, i1.next().get());
+    Assert.assertEquals(3.0, i1.next().get());
+    Assert.assertEquals(4.0, i1.next().get());
+    Iterable<DoubleWritable> m2 =
+        messageStore.getVertexMessages(new LongWritable(2));
+    Assert.assertEquals(1, Iterables.size(m2));
+    Assert.assertEquals(3.0, m2.iterator().next().get());
+    Assert.assertTrue(
+        Iterables.isEmpty(messageStore.getVertexMessages(new LongWritable(3))));
+  }
+}


Mime
View raw message