giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pava...@apache.org
Subject [2/2] git commit: updated refs/heads/trunk to 1852057
Date Wed, 17 Sep 2014 21:04:42 GMT
GIRAPH-938: Allow fast working with primitives generically (ikabiljo via pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/18520570
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/18520570
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/18520570

Branch: refs/heads/trunk
Commit: 185205703ee4cd886598f5393395f19fc367f65f
Parents: f6845a3
Author: Pavan Kumar <pavanka@fb.com>
Authored: Wed Sep 17 14:02:40 2014 -0700
Committer: Pavan Kumar <pavanka@fb.com>
Committed: Wed Sep 17 14:02:40 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../messages/InMemoryMessageStoreFactory.java   |  34 +-
 .../primitives/IdByteArrayMessageStore.java     | 246 ++++++++
 .../primitives/IdOneMessagePerVertexStore.java  | 226 +++++++
 .../apache/giraph/edge/IdAndNullArrayEdges.java | 184 ++++++
 .../giraph/edge/IdAndValueArrayEdges.java       | 248 ++++++++
 .../apache/giraph/types/ops/BooleanTypeOps.java |  53 ++
 .../apache/giraph/types/ops/ByteTypeOps.java    |  52 ++
 .../apache/giraph/types/ops/DoubleTypeOps.java  |  52 ++
 .../apache/giraph/types/ops/FloatTypeOps.java   |  52 ++
 .../org/apache/giraph/types/ops/IntTypeOps.java |  68 ++
 .../apache/giraph/types/ops/LongTypeOps.java    |  68 ++
 .../org/apache/giraph/types/ops/MapTypeOps.java |  47 ++
 .../giraph/types/ops/PrimitiveIdTypeOps.java    |  55 ++
 .../giraph/types/ops/PrimitiveTypeOps.java      |  42 ++
 .../apache/giraph/types/ops/TextTypeOps.java    |  46 ++
 .../org/apache/giraph/types/ops/TypeOps.java    |  51 ++
 .../apache/giraph/types/ops/TypeOpsUtils.java   | 149 +++++
 .../types/ops/collections/Basic2ObjectMap.java  | 322 ++++++++++
 .../types/ops/collections/BasicArrayList.java   | 632 +++++++++++++++++++
 .../giraph/types/ops/collections/BasicSet.java  | 206 ++++++
 .../ops/collections/ResettableIterator.java     |  32 +
 .../types/ops/collections/WritableWriter.java   |  47 ++
 .../types/ops/collections/package-info.java     |  21 +
 .../apache/giraph/types/ops/package-info.java   |  21 +
 25 files changed, 2949 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 34db15c..d9398e7 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-938: Allow fast working with primitives generically (ikabiljo via pavanka)
+
   GIRAPH-945: Always use job Configuration to create Configuration (majakabiljo)
 
   GIRAPH-931: Provide a Strongly Connected Components algorithm (gianluca via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 02ea7b2..ae86c56 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -20,15 +20,19 @@ package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore;
 import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
-import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -89,8 +93,17 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
           (MessageCombiner<LongWritable, DoubleWritable>)
               conf.<DoubleWritable>createMessageCombiner());
     } else {
-      messageStore = new OneMessagePerVertexStore(messageValueFactory,
-          service, conf.<M>createMessageCombiner(), conf);
+      PrimitiveIdTypeOps<I> idTypeOps =
+          TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
+      if (idTypeOps != null) {
+        messageStore = new IdOneMessagePerVertexStore<>(
+            messageValueFactory, service, conf.<M>createMessageCombiner(),
+            conf);
+      } else {
+        messageStore =
+            new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
+                conf.<M>createMessageCombiner(), conf);
+      }
     }
     return messageStore;
   }
@@ -127,11 +140,18 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
           MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
           encodeAndStore.equals(
               MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
-        messageStore = new ByteArrayMessagesPerVertexStore<>(
-            messageValueFactory, service, conf);
+        PrimitiveIdTypeOps<I> idTypeOps =
+            TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
+        if (idTypeOps != null) {
+          messageStore = new IdByteArrayMessageStore<>(
+              messageValueFactory, service, conf);
+        } else {
+          messageStore = new ByteArrayMessagesPerVertexStore<>(
+              messageValueFactory, service, conf);
+        }
       } else if (encodeAndStore.equals(
           MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
-        messageStore = new PointerListPerVertexStore(messageValueFactory,
+        messageStore = new PointerListPerVertexStore<>(messageValueFactory,
             service, conf);
       }
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
new file mode 100644
index 0000000..efe6199
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.messages.primitives;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.WritableWriter;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.io.DataInputOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Special message store to be used when IDs are primitive and no combiner is
+ * used.
+ * Data is backed by primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <I> Vertex id type
+ * @param <M> Message type
+ */
+public class IdByteArrayMessageStore<I extends WritableComparable,
+    M extends Writable> implements MessageStore<I, M> {
+  /** Message value factory */
+  protected final MessageValueFactory<M> messageValueFactory;
+  /** Map from partition id to map from vertex id to message */
+  private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>> map;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, ?, ?> service;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+  /** Vertex id TypeOps */
+  private final PrimitiveIdTypeOps<I> idTypeOps;
+  /** WritableWriter for values in this message store */
+  private final WritableWriter<DataInputOutput>
+  dataInputOutputWriter = new WritableWriter<DataInputOutput>() {
+    @Override
+    public DataInputOutput readFields(DataInput in) throws IOException {
+      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+      dataInputOutput.readFields(in);
+      return dataInputOutput;
+    }
+
+    @Override
+    public void write(DataOutput out, DataInputOutput value)
+      throws IOException {
+      value.write(out);
+    }
+  };
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param service Service worker
+   * @param config Hadoop configuration
+   */
+  public IdByteArrayMessageStore(MessageValueFactory<M> messageValueFactory,
+      CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    this.messageValueFactory = messageValueFactory;
+    this.service = service;
+    this.config = config;
+
+    idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
+
+    map = new Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>>();
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      Partition<I, ?, ?> partition =
+          service.getPartitionStore().getOrCreatePartition(partitionId);
+      Basic2ObjectMap<I, DataInputOutput> partitionMap =
+          idTypeOps.create2ObjectOpenHashMap(
+              Math.max(10, (int) partition.getVertexCount()));
+
+      map.put(partitionId, partitionMap);
+      service.getPartitionStore().putPartition((Partition) partition);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  private Basic2ObjectMap<I, DataInputOutput> getPartitionMap(I vertexId) {
+    return map.get(service.getPartitionId(vertexId));
+  }
+
+  /**
+   * Get the DataInputOutput for a vertex id, creating if necessary.
+   *
+   * @param partitionMap Partition map to look in
+   * @param vertexId Id of the vertex
+   * @return DataInputOutput for this vertex id (created if necessary)
+   */
+  private DataInputOutput getDataInputOutput(
+      Basic2ObjectMap<I, DataInputOutput> partitionMap,
+      I vertexId) {
+    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
+    if (dataInputOutput == null) {
+      dataInputOutput = config.createMessagesInputOutput();
+      partitionMap.put(vertexId, dataInputOutput);
+    }
+    return dataInputOutput;
+  }
+
+  @Override
+  public void addPartitionMessages(int partitionId,
+      VertexIdMessages<I, M> messages) throws IOException {
+    Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
+    synchronized (partitionMap) {
+      VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
+          messages.getVertexIdMessageBytesIterator();
+      // Try to copy the message buffer over rather than
+      // doing a deserialization of a message just to know its size. This
+      // should be more efficient for complex objects where serialization is
+      // expensive. If this type of iterator is not available, fall back to
+      // deserializing/serializing the messages
+      if (vertexIdMessageBytesIterator != null) {
+        while (vertexIdMessageBytesIterator.hasNext()) {
+          vertexIdMessageBytesIterator.next();
+          DataInputOutput dataInputOutput = getDataInputOutput(
+              partitionMap, vertexIdMessageBytesIterator.getCurrentVertexId());
+          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
+              dataInputOutput.getDataOutput());
+        }
+      } else {
+        VertexIdMessageIterator<I, M> iterator =
+            messages.getVertexIdMessageIterator();
+        while (iterator.hasNext()) {
+          iterator.next();
+          DataInputOutput dataInputOutput =
+              getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
+
+          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+              dataInputOutput.getDataOutput());
+        }
+      }
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId);
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
+    if (dataInputOutput == null) {
+      return EmptyIterable.get();
+    } else {
+      return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    getPartitionMap(vertexId).remove(vertexId);
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
+    Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
+    List<I> vertices = Lists.newArrayListWithCapacity(partitionMap.size());
+    Iterator<I> iterator = partitionMap.fastKeyIterator();
+    while (iterator.hasNext()) {
+      vertices.add(idTypeOps.createCopy(iterator.next()));
+    }
+    return vertices;
+  }
+
+  @Override
+  public void writePartition(DataOutput out, int partitionId)
+    throws IOException {
+    Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
+    partitionMap.write(out, dataInputOutputWriter);
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in, int partitionId)
+    throws IOException {
+    Basic2ObjectMap<I, DataInputOutput> partitionMap =
+        idTypeOps.create2ObjectOpenHashMap(10);
+    partitionMap.readFields(in, dataInputOutputWriter);
+    synchronized (map) {
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  @Override
+  public void finalizeStore() {
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
new file mode 100644
index 0000000..c72bedf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.messages.primitives;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.WritableWriter;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Special message store to be used when IDs are primitive and message doesn't
+ * need to be, and message combiner is used.
+ * Data is backed by primitive keyed maps in order to decrease number of
+ * objects and get better performance.
+ * (keys are using primitives, values are using objects, even if they
+ * are primitive)
+ *
+ * @param <I> Vertex id type
+ * @param <M> Message type
+ */
+public class IdOneMessagePerVertexStore<I extends WritableComparable,
+    M extends Writable> implements MessageStore<I, M> {
+  /** Map from partition id to map from vertex id to message */
+  private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, M>> map;
+  /** Message value factory */
+  private final MessageValueFactory<M> messageValueFactory;
+  /** Message messageCombiner */
+  private final MessageCombiner<I, M> messageCombiner;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, ?, ?> service;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+  /** Vertex id TypeOps */
+  private final PrimitiveIdTypeOps<I> idTypeOps;
+  /** WritableWriter for values in this message store */
+  private final WritableWriter<M> messageWriter = new WritableWriter<M>() {
+    @Override
+    public M readFields(DataInput in) throws IOException {
+      M message = messageValueFactory.newInstance();
+      message.readFields(in);
+      return message;
+    }
+
+    @Override
+    public void write(DataOutput out, M value) throws IOException {
+      value.write(out);
+    }
+  };
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Message value factory
+   * @param service Service worker
+   * @param messageCombiner Message messageCombiner
+   * @param config Config
+   */
+  public IdOneMessagePerVertexStore(
+      MessageValueFactory<M> messageValueFactory,
+      CentralizedServiceWorker<I, ?, ?> service,
+      MessageCombiner<I, M> messageCombiner,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    this.service = service;
+    this.config = config;
+    this.messageValueFactory = messageValueFactory;
+    this.messageCombiner = messageCombiner;
+
+    idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
+
+    map = new Int2ObjectOpenHashMap<>();
+    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+      Partition<I, ?, ?> partition =
+          service.getPartitionStore().getOrCreatePartition(partitionId);
+      Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
+          (int) partition.getVertexCount());
+      map.put(partitionId, partitionMap);
+      service.getPartitionStore().putPartition((Partition) partition);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  private Basic2ObjectMap<I, M> getPartitionMap(I vertexId) {
+    return map.get(service.getPartitionId(vertexId));
+  }
+
+  @Override
+  public void addPartitionMessages(
+      int partitionId,
+      VertexIdMessages<I, M> messages) throws IOException {
+    Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
+    synchronized (partitionMap) {
+      VertexIdMessageIterator<I, M>
+          iterator = messages.getVertexIdMessageIterator();
+      // This loop is a little complicated as it is optimized to only create
+      // the minimal amount of vertex id and message objects as possible.
+      while (iterator.hasNext()) {
+        iterator.next();
+        I vertexId = iterator.getCurrentVertexId();
+        M currentMessage =
+            partitionMap.get(iterator.getCurrentVertexId());
+        if (currentMessage == null) {
+          M newMessage = messageCombiner.createInitialMessage();
+          currentMessage = partitionMap.put(
+              iterator.getCurrentVertexId(), newMessage);
+          if (currentMessage == null) {
+            currentMessage = newMessage;
+          }
+        }
+        messageCombiner.combine(vertexId, currentMessage,
+          iterator.getCurrentMessage());
+      }
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId);
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(
+      I vertexId) throws IOException {
+    Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
+    if (!partitionMap.containsKey(vertexId)) {
+      return EmptyIterable.get();
+    } else {
+      return Collections.singleton(partitionMap.get(vertexId));
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    getPartitionMap(vertexId).remove(vertexId);
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<I> getPartitionDestinationVertices(
+      int partitionId) {
+    Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
+    List<I> vertices =
+        Lists.newArrayListWithCapacity(partitionMap.size());
+    Iterator<I> iterator = partitionMap.fastKeyIterator();
+    while (iterator.hasNext()) {
+      vertices.add(idTypeOps.createCopy(iterator.next()));
+    }
+    return vertices;
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
+    partitionMap.write(out, messageWriter);
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(10);
+    partitionMap.readFields(in, messageWriter);
+    synchronized (map) {
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  @Override
+  public void finalizeStore() {
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/edge/IdAndNullArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/IdAndNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/IdAndNullArrayEdges.java
new file mode 100644
index 0000000..7de5d2a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/IdAndNullArrayEdges.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.utils.EdgeIterables;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Implementation of {@link OutEdges} with IDs and null edge values having
+ * their TypeOps.
+ * Backed by a dynamic primitive array. Parallel edges are allowed.
+ * Note: this implementation is optimized for space
+ * usage, but random access and edge removals are expensive.
+ *
+ * @param <I> Vertex id type
+ */
+public class IdAndNullArrayEdges<I extends WritableComparable>
+  implements ReuseObjectsOutEdges<I, NullWritable>,
+  MutableOutEdges<I, NullWritable>,
+  ImmutableClassesGiraphConfigurable<I, Writable, NullWritable> {
+
+  /** Array of target vertex ids. */
+  private BasicArrayList<I> neighbors;
+
+  @Override
+  public
+  ImmutableClassesGiraphConfiguration<I, Writable, NullWritable> getConf() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, Writable, NullWritable> conf) {
+    PrimitiveIdTypeOps<I> idTypeOps =
+        TypeOpsUtils.getPrimitiveIdTypeOps(conf.getVertexIdClass());
+    neighbors = idTypeOps.createArrayList(10);
+    if (!conf.getEdgeValueClass().equals(NullWritable.class)) {
+      throw new IllegalArgumentException(
+          "IdAndNullArrayEdges can be used only with NullWritable " +
+          "as edgeValueClass, not with " + conf.getEdgeValueClass());
+    }
+  }
+
+  @Override
+  public void initialize(Iterable<Edge<I, NullWritable>> edges) {
+    EdgeIterables.initialize(this, edges);
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    neighbors.setCapacity(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    initialize(10);
+  }
+
+  @Override
+  public void add(Edge<I, NullWritable> edge) {
+    neighbors.add(edge.getTargetVertexId());
+  }
+
+  /**
+   * If the backing array is more than four times as big as the number of
+   * elements, reduce to 2 times current size.
+   */
+  private void trim() {
+    if (neighbors.capacity() > 4 * neighbors.size()) {
+      neighbors.setCapacity(neighbors.size() * 2);
+    }
+  }
+
+  /**
+   * Remove edge at position i.
+   *
+   * @param i Position of edge to be removed
+   */
+  private void removeAt(int i) {
+    // The order of the edges is irrelevant, so we can simply replace
+    // the deleted edge with the rightmost element, thus achieving constant
+    // time.
+    I tmpValue = neighbors.getElementTypeOps().create();
+    neighbors.popInto(tmpValue);
+    if (i != neighbors.size()) {
+      neighbors.set(i, tmpValue);
+    }
+    // If needed after the removal, trim the array.
+    trim();
+  }
+
+  @Override
+  public void remove(I targetVertexId) {
+    // Thanks to the constant-time implementation of removeAt(int),
+    // we can remove all matching edges in linear time.
+    I tmpValue = neighbors.getElementTypeOps().create();
+    for (int i = neighbors.size() - 1; i >= 0; --i) {
+      neighbors.getInto(i, tmpValue);
+      if (tmpValue.equals(targetVertexId)) {
+        removeAt(i);
+      }
+    }
+  }
+
+  @Override
+  public int size() {
+    return neighbors.size();
+  }
+
+  @Override
+  public Iterator<Edge<I, NullWritable>> iterator() {
+    // Returns an iterator that reuses objects.
+    // The downcast is fine because all concrete Edge implementations are
+    // mutable, but we only expose the mutation functionality when appropriate.
+    return (Iterator) mutableIterator();
+  }
+
+  @Override
+  public Iterator<MutableEdge<I, NullWritable>> mutableIterator() {
+    return new Iterator<MutableEdge<I, NullWritable>>() {
+      /** Current position in the array. */
+      private int offset = 0;
+      /** Representative edge object. */
+      private final MutableEdge<I, NullWritable> representativeEdge =
+          EdgeFactory.createReusable(neighbors.getElementTypeOps().create());
+
+      @Override
+      public boolean hasNext() {
+        return offset < neighbors.size();
+      }
+
+      @Override
+      public MutableEdge<I, NullWritable> next() {
+        neighbors.getInto(offset++, representativeEdge.getTargetVertexId());
+        return representativeEdge;
+      }
+
+      @Override
+      public void remove() {
+        // Since removeAt() might replace the deleted edge with the last edge
+        // in the array, we need to decrease the offset so that the latter
+        // won't be skipped.
+        removeAt(--offset);
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    neighbors.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    neighbors.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/edge/IdAndValueArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/IdAndValueArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/IdAndValueArrayEdges.java
new file mode 100644
index 0000000..b99692f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/IdAndValueArrayEdges.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.utils.EdgeIterables;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * Implementation of {@link OutEdges} with IDs and Edge values having their
+ * TypeOps.
+ * Data is backed by a dynamic primitive array. Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage, but random access
+ * and edge removals are expensive.
+ *
+ * @param <I> Vertex id type
+ * @param <E> Edge value type
+ */
+public class IdAndValueArrayEdges<I extends WritableComparable,
+    E extends Writable> implements ReuseObjectsOutEdges<I, E>,
+    MutableOutEdges<I, E>,
+    ImmutableClassesGiraphConfigurable<I, Writable, E> {
+
+  /** Array of target vertex ids. */
+  private BasicArrayList<I> neighborIds;
+  /** Array of edge values. */
+  private BasicArrayList<E> neighborEdgeValues;
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, Writable, E> getConf() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
+    PrimitiveIdTypeOps<I> idTypeOps =
+        TypeOpsUtils.getPrimitiveIdTypeOps(conf.getVertexIdClass());
+    neighborIds = idTypeOps.createArrayList(10);
+
+    PrimitiveTypeOps<E> edgeTypeOps =
+        TypeOpsUtils.getPrimitiveTypeOps(conf.getEdgeValueClass());
+    neighborEdgeValues = edgeTypeOps.createArrayList(10);
+  }
+
+  @Override
+  public void initialize(Iterable<Edge<I, E>> edges) {
+    EdgeIterables.initialize(this, edges);
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    neighborIds.setCapacity(capacity);
+    neighborEdgeValues.setCapacity(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    initialize(10);
+  }
+
+  @Override
+  public void add(Edge<I, E> edge) {
+    neighborIds.add(edge.getTargetVertexId());
+    neighborEdgeValues.add(edge.getValue());
+  }
+
+  /**
+   * If the backing array is more than four times as big as the number of
+   * elements, reduce to 2 times current size.
+   */
+  private void trim() {
+    if (neighborIds.capacity() > 4 * neighborIds.size()) {
+      neighborIds.setCapacity(neighborIds.size() * 2);
+      neighborEdgeValues.setCapacity(neighborIds.size() * 2);
+    }
+  }
+
+  /**
+   * Remove edge at position i.
+   *
+   * @param i Position of edge to be removed
+   */
+  private void removeAt(int i) {
+    // The order of the edges is irrelevant, so we can simply replace
+    // the deleted edge with the rightmost element, thus achieving constant
+    // time.
+    I tmpId = neighborIds.getElementTypeOps().create();
+    E tmpValue = neighborEdgeValues.getElementTypeOps().create();
+
+    neighborIds.popInto(tmpId);
+    neighborEdgeValues.popInto(tmpValue);
+    if (i != neighborIds.size()) {
+      neighborIds.set(i, tmpId);
+      neighborEdgeValues.set(i, tmpValue);
+    }
+    // If needed after the removal, trim the array.
+    trim();
+  }
+
+  @Override
+  public void remove(I targetVertexId) {
+    // Thanks to the constant-time implementation of removeAt(int),
+    // we can remove all matching edges in linear time.
+    I tmpId = neighborIds.getElementTypeOps().create();
+    for (int i = neighborIds.size() - 1; i >= 0; --i) {
+      neighborIds.getInto(i, tmpId);
+      if (tmpId.equals(targetVertexId)) {
+        removeAt(i);
+      }
+    }
+  }
+
+  @Override
+  public int size() {
+    return neighborIds.size();
+  }
+
+  @Override
+  public Iterator<Edge<I, E>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<I, E>>() {
+      private int index;
+
+      /** Representative edge object. */
+      private final Edge<I, E> representativeEdge = EdgeFactory.create(
+          neighborIds.getElementTypeOps().create(),
+          neighborEdgeValues.getElementTypeOps().create());
+
+      @Override
+      public boolean hasNext() {
+        return index < neighborIds.size();
+      }
+
+      @Override
+      public Edge<I, E> next() {
+        neighborIds.getInto(index, representativeEdge.getTargetVertexId());
+        neighborEdgeValues.getInto(index, representativeEdge.getValue());
+        index++;
+        return representativeEdge;
+      }
+    };
+  }
+
+  /** Helper class for a mutable edge that modifies the backing arrays. */
+  private class ArrayMutableEdge extends DefaultEdge<I, E> {
+    /** Index of the edge in the backing arrays. */
+    private int index;
+
+    /** Constructor. */
+    public ArrayMutableEdge() {
+      super(
+          neighborIds.getElementTypeOps().create(),
+          neighborEdgeValues.getElementTypeOps().create());
+    }
+
+    /**
+     * Make the edge point to the given index in the backing arrays.
+     *
+     * @param index Index in the arrays
+     */
+    public void setIndex(int index) {
+      // Update the id and value objects from the superclass.
+      neighborIds.getInto(index, getTargetVertexId());
+      neighborEdgeValues.getInto(index, getValue());
+      // Update the index.
+      this.index = index;
+    }
+
+    @Override
+    public void setValue(E value) {
+      // Update the value object from the superclass.
+      neighborEdgeValues.getElementTypeOps().set(getValue(), value);
+      // Update the value stored in the backing array.
+      neighborEdgeValues.set(index, value);
+    }
+  }
+
+  @Override
+  public Iterator<MutableEdge<I, E>> mutableIterator() {
+    return new Iterator<MutableEdge<I, E>>() {
+      /** Current position in the array. */
+      private int index = 0;
+      /** Representative edge object. */
+      private final ArrayMutableEdge representativeEdge =
+          new ArrayMutableEdge();
+
+      @Override
+      public boolean hasNext() {
+        return index < neighborIds.size();
+      }
+
+      @Override
+      public MutableEdge<I, E> next() {
+        representativeEdge.setIndex(index++);
+        return representativeEdge;
+      }
+
+      @Override
+      public void remove() {
+        // Since removeAt() might replace the deleted edge with the last edge
+        // in the array, we need to decrease the offset so that the latter
+        // won't be skipped.
+        removeAt(--index);
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    neighborIds.write(out);
+    neighborEdgeValues.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    neighborIds.readFields(in);
+    neighborEdgeValues.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/BooleanTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/BooleanTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/BooleanTypeOps.java
new file mode 100644
index 0000000..a65fa3b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/BooleanTypeOps.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.giraph.types.ops.collections.BasicArrayList.BasicBooleanArrayList;
+import org.apache.hadoop.io.BooleanWritable;
+
+
+/** TypeOps implementation for working with BooleanWritable type */
+public enum BooleanTypeOps implements PrimitiveTypeOps<BooleanWritable> {
+  /** Singleton instance */
+  INSTANCE();
+
+  @Override
+  public Class<BooleanWritable> getTypeClass() {
+    return BooleanWritable.class;
+  }
+
+  @Override
+  public BooleanWritable create() {
+    return new BooleanWritable();
+  }
+
+  @Override
+  public BooleanWritable createCopy(BooleanWritable from) {
+    return new BooleanWritable(from.get());
+  }
+
+  @Override
+  public void set(BooleanWritable to, BooleanWritable from) {
+    to.set(from.get());
+  }
+
+  @Override
+  public BasicBooleanArrayList createArrayList(int capacity) {
+    return new BasicBooleanArrayList(capacity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java
new file mode 100644
index 0000000..2b27ba5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.giraph.types.ops.collections.BasicArrayList.BasicByteArrayList;
+import org.apache.hadoop.io.ByteWritable;
+
+/** TypeOps implementation for working with ByteWritable type */
+public enum ByteTypeOps implements PrimitiveTypeOps<ByteWritable> {
+  /** Singleton instance */
+  INSTANCE();
+
+  @Override
+  public Class<ByteWritable> getTypeClass() {
+    return ByteWritable.class;
+  }
+
+  @Override
+  public ByteWritable create() {
+    return new ByteWritable();
+  }
+
+  @Override
+  public ByteWritable createCopy(ByteWritable from) {
+    return new ByteWritable(from.get());
+  }
+
+  @Override
+  public void set(ByteWritable to, ByteWritable from) {
+    to.set(from.get());
+  }
+
+  @Override
+  public BasicByteArrayList createArrayList(int capacity) {
+    return new BasicByteArrayList(capacity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
new file mode 100644
index 0000000..af8c38f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.giraph.types.ops.collections.BasicArrayList.BasicDoubleArrayList;
+import org.apache.hadoop.io.DoubleWritable;
+
+/** TypeOps implementation for working with DoubleWritable type */
+public enum DoubleTypeOps implements PrimitiveTypeOps<DoubleWritable> {
+  /** Singleton instance */
+  INSTANCE();
+
+  @Override
+  public Class<DoubleWritable> getTypeClass() {
+    return DoubleWritable.class;
+  }
+
+  @Override
+  public DoubleWritable create() {
+    return new DoubleWritable();
+  }
+
+  @Override
+  public DoubleWritable createCopy(DoubleWritable from) {
+    return new DoubleWritable(from.get());
+  }
+
+  @Override
+  public void set(DoubleWritable to, DoubleWritable from) {
+    to.set(from.get());
+  }
+
+  @Override
+  public BasicDoubleArrayList createArrayList(int capacity) {
+    return new BasicDoubleArrayList(capacity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
new file mode 100644
index 0000000..3ca8409
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.giraph.types.ops.collections.BasicArrayList.BasicFloatArrayList;
+import org.apache.hadoop.io.FloatWritable;
+
+/** TypeOps implementation for working with FloatWritable type */
+public enum FloatTypeOps implements PrimitiveTypeOps<FloatWritable> {
+  /** Singleton instance */
+  INSTANCE();
+
+  @Override
+  public Class<FloatWritable> getTypeClass() {
+    return FloatWritable.class;
+  }
+
+  @Override
+  public FloatWritable create() {
+    return new FloatWritable();
+  }
+
+  @Override
+  public FloatWritable createCopy(FloatWritable from) {
+    return new FloatWritable(from.get());
+  }
+
+  @Override
+  public void set(FloatWritable to, FloatWritable from) {
+    to.set(from.get());
+  }
+
+  @Override
+  public BasicFloatArrayList createArrayList(int capacity) {
+    return new BasicFloatArrayList(capacity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
new file mode 100644
index 0000000..f9a32c0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap.BasicInt2ObjectOpenHashMap;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.BasicArrayList.BasicIntArrayList;
+import org.apache.giraph.types.ops.collections.BasicSet;
+import org.apache.giraph.types.ops.collections.BasicSet.BasicIntOpenHashSet;
+import org.apache.hadoop.io.IntWritable;
+
+/** TypeOps implementation for working with IntWritable type */
+public enum IntTypeOps implements PrimitiveIdTypeOps<IntWritable> {
+  /** Singleton instance */
+  INSTANCE;
+
+  @Override
+  public Class<IntWritable> getTypeClass() {
+    return IntWritable.class;
+  }
+
+  @Override
+  public IntWritable create() {
+    return new IntWritable();
+  }
+
+  @Override
+  public IntWritable createCopy(IntWritable from) {
+    return new IntWritable(from.get());
+  }
+
+  @Override
+  public void set(IntWritable to, IntWritable from) {
+    to.set(from.get());
+  }
+
+  @Override
+  public BasicSet<IntWritable> createOpenHashSet(int capacity) {
+    return new BasicIntOpenHashSet(capacity);
+  }
+
+  @Override
+  public BasicArrayList<IntWritable> createArrayList(int capacity) {
+    return new BasicIntArrayList(capacity);
+  }
+
+  @Override
+  public <V> Basic2ObjectMap<IntWritable, V> create2ObjectOpenHashMap(
+      int capacity) {
+    return new BasicInt2ObjectOpenHashMap<>(capacity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
new file mode 100644
index 0000000..4e5ca54
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap.BasicLong2ObjectOpenHashMap;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.BasicArrayList.BasicLongArrayList;
+import org.apache.giraph.types.ops.collections.BasicSet;
+import org.apache.giraph.types.ops.collections.BasicSet.BasicLongOpenHashSet;
+import org.apache.hadoop.io.LongWritable;
+
+/** TypeOps implementation for working with LongWritable type */
+public enum LongTypeOps implements PrimitiveIdTypeOps<LongWritable> {
+  /** Singleton instance */
+  INSTANCE;
+
+  @Override
+  public Class<LongWritable> getTypeClass() {
+    return LongWritable.class;
+  }
+
+  @Override
+  public LongWritable create() {
+    return new LongWritable();
+  }
+
+  @Override
+  public LongWritable createCopy(LongWritable from) {
+    return new LongWritable(from.get());
+  }
+
+  @Override
+  public void set(LongWritable to, LongWritable from) {
+    to.set(from.get());
+  }
+
+  @Override
+  public BasicSet<LongWritable> createOpenHashSet(int capacity) {
+    return new BasicLongOpenHashSet(capacity);
+  }
+
+  @Override
+  public BasicArrayList<LongWritable> createArrayList(int capacity) {
+    return new BasicLongArrayList(capacity);
+  }
+
+  @Override
+  public <V> Basic2ObjectMap<LongWritable, V> create2ObjectOpenHashMap(
+      int capacity) {
+    return new BasicLong2ObjectOpenHashMap<>(capacity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/MapTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/MapTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/MapTypeOps.java
new file mode 100644
index 0000000..cd9f079
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/MapTypeOps.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.hadoop.io.MapWritable;
+
+/** TypeOps implementation for working with MapWritable type */
+public enum MapTypeOps implements TypeOps<MapWritable> {
+  /** Singleton instance */
+  INSTANCE();
+
+  @Override
+  public Class<MapWritable> getTypeClass() {
+    return MapWritable.class;
+  }
+
+  @Override
+  public MapWritable create() {
+    return new MapWritable();
+  }
+
+  @Override
+  public MapWritable createCopy(MapWritable from) {
+    return new MapWritable(from);
+  }
+
+  @Override
+  public void set(MapWritable to, MapWritable from) {
+    to.clear();
+    to.putAll(from);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
new file mode 100644
index 0000000..29b0c6e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicSet;
+
+
+
+/**
+ * Additional type operations to TypeOps for types that can be IDs,
+ * and so can be used as keys in maps and values in sets.
+ *
+ * Using any of the provided operations should lead to no boxing/unboxing.
+ *
+ * Useful generic wrappers to fastutil libraries are provided,
+ * so that you can look at them generically.
+ *
+ * @param <T> Type
+ */
+public interface PrimitiveIdTypeOps<T> extends PrimitiveTypeOps<T> {
+  // primitive collections
+
+  /**
+   * Create BasicSet of type T, given capacity.
+   * @param capacity Capacity
+   * @return BasicSet
+   */
+  BasicSet<T> createOpenHashSet(int capacity);
+
+  /**
+   * Create Basic2ObjectMap with key type T, given capacity.
+   * Values are represented as object, even if they can be primitive.
+   *
+   * @param capacity Capacity
+   * @param <V> Type of values in the map
+   * @return Basic2ObjectMap
+   */
+  <V> Basic2ObjectMap<T, V> create2ObjectOpenHashMap(int capacity);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveTypeOps.java
new file mode 100644
index 0000000..72b684f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveTypeOps.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+
+
+/**
+ * Type operations, allowing working generically with types,
+ * but still having efficient code.
+ *
+ * Using any of the provided operations should lead to no boxing/unboxing.
+ *
+ * Useful generic wrappers to fastutil libraries are provided,
+ * so that you can look at them generically.
+ *
+ * @param <T> Type
+ */
+public interface PrimitiveTypeOps<T> extends TypeOps<T> {
+  // primitive collections
+  /**
+   * Create BasicArrayList of type T, given capacity.
+   * @param capacity Capacity
+   * @return BasicArrayList
+   */
+  BasicArrayList<T> createArrayList(int capacity);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/TextTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TextTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TextTypeOps.java
new file mode 100644
index 0000000..c785cd9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TextTypeOps.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.hadoop.io.Text;
+
+/** TypeOps implementation for working with Text type */
+public enum TextTypeOps implements TypeOps<Text> {
+  /** Singleton instance */
+  INSTANCE();
+
+  @Override
+  public Class<Text> getTypeClass() {
+    return Text.class;
+  }
+
+  @Override
+  public Text create() {
+    return new Text();
+  }
+
+  @Override
+  public Text createCopy(Text from) {
+    return new Text(from);
+  }
+
+  @Override
+  public void set(Text to, Text from) {
+    to.set(from.getBytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
new file mode 100644
index 0000000..b7f9479
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+
+/**
+ * Type operations, allowing working generically with mutable types,
+ * but still having efficient code.
+ * For example, by reducing object allocation via reuse.
+ *
+ * @param <T> Type
+ */
+public interface TypeOps<T> {
+  /**
+   * Class object for generic type T.
+   * @return Class<T> object
+   */
+  Class<T> getTypeClass();
+  /**
+   * Create new instance of type T.
+   * @return new instance
+   */
+  T create();
+  /**
+   * Create a copy of passed object
+   * @param from Object to copy
+   * @return Copy
+   */
+  T createCopy(T from);
+  /**
+   * Copies value from first argument into the second.
+   * @param to Value of object to be copied
+   * @param from Object into which value should be copied
+   */
+  void set(T to, T from);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
new file mode 100644
index 0000000..df5f2bd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Utility functions for getting TypeOps instances from class types.
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TypeOpsUtils {
+  /** No instances */
+  private TypeOpsUtils() { }
+
+  /**
+   * Get PrimitiveIdTypeOps for given type, or null if there is none.
+   * @param type Class type
+   * @param <T> Type
+   * @return PrimitiveIdTypeOps
+   */
+  public static <T>
+  PrimitiveIdTypeOps<T> getPrimitiveIdTypeOpsOrNull(Class<T> type) {
+    if (type.equals(LongWritable.class)) {
+      return (PrimitiveIdTypeOps) LongTypeOps.INSTANCE;
+    } else if (type.equals(IntWritable.class)) {
+      return (PrimitiveIdTypeOps) IntTypeOps.INSTANCE;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get PrimitiveIdTypeOps for given type.
+   * Exception will be thrown if there is none.
+   * @param type Class type
+   * @param <T> Type
+   * @return PrimitiveIdTypeOps
+   */
+  public static <T>
+  PrimitiveIdTypeOps<T> getPrimitiveIdTypeOps(Class<T> type) {
+    PrimitiveIdTypeOps<T> typeOps = getPrimitiveIdTypeOpsOrNull(type);
+    if (typeOps != null) {
+      return typeOps;
+    } else {
+      throw new IllegalArgumentException(
+          type + " not supported in PrimitiveIdTypeOps");
+    }
+  }
+
+  /**
+   * Get PrimitiveTypeOps for given type, or null if there is none.
+   * @param type Class type
+   * @param <T> Type
+   * @return PrimitiveTypeOps
+   */
+  public static <T>
+  PrimitiveTypeOps<T> getPrimitiveTypeOpsOrNull(Class<T> type) {
+    PrimitiveTypeOps<T> typeOps = getPrimitiveIdTypeOpsOrNull(type);
+    if (typeOps != null) {
+      return typeOps;
+    } else if (type.equals(FloatWritable.class)) {
+      return (PrimitiveTypeOps) FloatTypeOps.INSTANCE;
+    } else if (type.equals(DoubleWritable.class)) {
+      return (PrimitiveTypeOps) DoubleTypeOps.INSTANCE;
+    } else if (type.equals(BooleanWritable.class)) {
+      return (PrimitiveTypeOps) BooleanTypeOps.INSTANCE;
+    } else if (type.equals(ByteWritable.class)) {
+      return (PrimitiveTypeOps) ByteTypeOps.INSTANCE;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get PrimitiveTypeOps for given type.
+   * Exception will be thrown if there is none.
+   * @param type Class type
+   * @param <T> Type
+   * @return PrimitiveTypeOps
+   */
+  public static <T>
+  PrimitiveTypeOps<T> getPrimitiveTypeOps(Class<T> type) {
+    PrimitiveTypeOps<T> typeOps = getPrimitiveTypeOpsOrNull(type);
+    if (typeOps != null) {
+      return typeOps;
+    } else {
+      throw new IllegalArgumentException(
+          type + " not supported in PrimitiveTypeOps");
+    }
+  }
+
+  /**
+   * Get TypeOps for given type, or null if there is none.
+   * @param type Class type
+   * @param <T> Type
+   * @return TypeOps
+   */
+  public static <T> TypeOps<T> getTypeOpsOrNull(Class<T> type) {
+    TypeOps<T> typeOps = getPrimitiveTypeOpsOrNull(type);
+    if (typeOps != null) {
+      return typeOps;
+    } else if (type.equals(Text.class)) {
+      return (TypeOps) TextTypeOps.INSTANCE;
+    } else if (type.equals(MapWritable.class)) {
+      return (TypeOps) MapTypeOps.INSTANCE;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get TypeOps for given type.
+   * Exception will be thrown if there is none.
+   * @param type Class type
+   * @param <T> Type
+   * @return TypeOps
+   */
+  public static <T> TypeOps<T> getTypeOps(Class<T> type) {
+    TypeOps<T> typeOps = getTypeOpsOrNull(type);
+    if (typeOps != null) {
+      return typeOps;
+    } else {
+      throw new IllegalArgumentException(
+          type + " not supported in TypeOps");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
new file mode 100644
index 0000000..f7ef570
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops.collections;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.types.ops.IntTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Basic2ObjectMap with only basic set of operations.
+ * All operations that return object T are returning reusable object,
+ * which is modified after calling any other function.
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ */
+public abstract class Basic2ObjectMap<K, V> {
+  /** Removes all of the elements from this list. */
+  public abstract void clear();
+  /**
+   * Number of elements in this list
+   * @return size
+   */
+  public abstract int size();
+
+  /**
+   * Checks whether key is present in the map
+   * @param key Key
+   * @return true if present
+   */
+  public abstract boolean containsKey(K key);
+  /**
+   * Adds a pair to the map.
+   *
+   * @param key Key
+   * @param value Value.
+   * @return the old value, or null if no value was present for the given key.
+   */
+  public abstract V put(K key, V value);
+  /**
+   * Get value for a given key
+   * @param key Key
+   * @return Value, or null
+   */
+  public abstract V get(K key);
+  /**
+   * Removes the mapping with the given key.
+   *
+   * @param key Key
+   * @return the old value, or null if no value was present for the given key.
+   */
+  public abstract V remove(K key);
+
+  /**
+   * TypeOps for type of keys this object holds
+   * @return TypeOps
+   */
+  public abstract PrimitiveIdTypeOps<K> getKeyTypeOps();
+
+  /**
+   * Fast iterator over keys within this map, which doesn't allocate new
+   * element for each returned element.
+   *
+   * Object returned by next() is only valid until next() is called again,
+   * because it is reused.
+   *
+   * @return Iterator
+   */
+  public abstract Iterator<K> fastKeyIterator();
+
+  /**
+   * Serializes the object, given a writer for values.
+   * @param out <code>DataOuput</code> to serialize object into.
+   * @param writer Writer of values
+   * @throws IOException
+   */
+  public abstract void write(DataOutput out, WritableWriter<V> writer)
+    throws IOException;
+  /**
+   * Deserialize the object, given a writer for values.
+   * @param in <code>DataInput</code> to deseriablize object from.
+   * @param writer Writer of values
+   * @throws IOException
+   */
+  public abstract void readFields(DataInput in, WritableWriter<V> writer)
+    throws IOException;
+
+  /**
+   * Iterator that reuses key object.
+   *
+   * @param <Iter> Primitive key iterator type
+   */
+  protected abstract class ReusableIterator<Iter extends Iterator<?>>
+      implements Iterator<K> {
+    /** Primitive Key iterator */
+    protected final Iter iter;
+    /** Reusable key object */
+    protected final K reusableKey = getKeyTypeOps().create();
+
+    /**
+     * Constructor
+     * @param iter Primitive Key iterator
+     */
+    public ReusableIterator(Iter iter) {
+      this.iter = iter;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public void remove() {
+      iter.remove();
+    }
+  }
+
+  /** IntWritable implementation of Basic2ObjectMap */
+  public static final class BasicInt2ObjectOpenHashMap<V>
+      extends Basic2ObjectMap<IntWritable, V> {
+    /** Map */
+    private final Int2ObjectOpenHashMap<V> map;
+
+    /**
+     * Constructor
+     * @param capacity Capacity
+     */
+    public BasicInt2ObjectOpenHashMap(int capacity) {
+      this.map = new Int2ObjectOpenHashMap<>(capacity);
+    }
+
+    @Override
+    public void clear() {
+      map.clear();
+    }
+
+    @Override
+    public int size() {
+      return map.size();
+    }
+
+    @Override
+    public boolean containsKey(IntWritable key) {
+      return map.containsKey(key.get());
+    }
+
+    @Override
+    public V put(IntWritable key, V value) {
+      return map.put(key.get(), value);
+    }
+
+    @Override
+    public V get(IntWritable key) {
+      return map.get(key.get());
+    }
+
+    @Override
+    public V remove(IntWritable key) {
+      return map.remove(key.get());
+    }
+
+    @Override
+    public PrimitiveIdTypeOps<IntWritable> getKeyTypeOps() {
+      return IntTypeOps.INSTANCE;
+    }
+
+    @Override
+    public Iterator<IntWritable> fastKeyIterator() {
+      return new ReusableIterator<IntIterator>(map.keySet().iterator()) {
+        @Override
+        public IntWritable next() {
+          reusableKey.set(iter.nextInt());
+          return reusableKey;
+        }
+      };
+    }
+
+    @Override
+    public void write(DataOutput out, WritableWriter<V> writer)
+      throws IOException {
+      out.writeInt(map.size());
+      ObjectIterator<Int2ObjectMap.Entry<V>> iterator =
+          map.int2ObjectEntrySet().fastIterator();
+      while (iterator.hasNext()) {
+        Int2ObjectMap.Entry<V> entry = iterator.next();
+        out.writeInt(entry.getIntKey());
+        writer.write(out, entry.getValue());
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in, WritableWriter<V> writer)
+      throws IOException {
+      int size = in.readInt();
+      map.clear();
+      map.trim(size);
+      while (size-- > 0) {
+        int key = in.readInt();
+        V value = writer.readFields(in);
+        map.put(key, value);
+      }
+    }
+  }
+
+  /** LongWritable implementation of Basic2ObjectMap */
+  public static final class BasicLong2ObjectOpenHashMap<V>
+      extends Basic2ObjectMap<LongWritable, V> {
+    /** Map */
+    private final Long2ObjectOpenHashMap<V> map;
+
+    /**
+     * Constructor
+     * @param capacity Capacity
+     */
+    public BasicLong2ObjectOpenHashMap(int capacity) {
+      this.map = new Long2ObjectOpenHashMap<>(capacity);
+    }
+
+    @Override
+    public void clear() {
+      map.clear();
+    }
+
+    @Override
+    public int size() {
+      return map.size();
+    }
+
+    @Override
+    public boolean containsKey(LongWritable key) {
+      return map.containsKey(key.get());
+    }
+
+    @Override
+    public V put(LongWritable key, V value) {
+      return map.put(key.get(), value);
+    }
+
+    @Override
+    public V get(LongWritable key) {
+      return map.get(key.get());
+    }
+
+    @Override
+    public V remove(LongWritable key) {
+      return map.remove(key.get());
+    }
+
+    @Override
+    public PrimitiveIdTypeOps<LongWritable> getKeyTypeOps() {
+      return LongTypeOps.INSTANCE;
+    }
+
+    @Override
+    public Iterator<LongWritable> fastKeyIterator() {
+      return new ReusableIterator<LongIterator>(map.keySet().iterator()) {
+        @Override
+        public LongWritable next() {
+          reusableKey.set(iter.nextLong());
+          return reusableKey;
+        }
+      };
+    }
+
+    @Override
+    public void write(DataOutput out, WritableWriter<V> writer)
+      throws IOException {
+      out.writeInt(map.size());
+      ObjectIterator<Long2ObjectMap.Entry<V>> iterator =
+          map.long2ObjectEntrySet().fastIterator();
+      while (iterator.hasNext()) {
+        Long2ObjectMap.Entry<V> entry = iterator.next();
+        out.writeLong(entry.getLongKey());
+        writer.write(out, entry.getValue());
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in, WritableWriter<V> writer)
+      throws IOException {
+      int size = in.readInt();
+      map.clear();
+      map.trim(size);
+      while (size-- > 0) {
+        long key = in.readLong();
+        V value = writer.readFields(in);
+        map.put(key, value);
+      }
+    }
+  }
+}


Mime
View raw message