giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apre...@apache.org
Subject [7/8] GIRAPH-528: Decouple vertex implementation from edge storage (apresta)
Date Thu, 07 Mar 2013 05:37:41 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 0fc1858..d1e99cf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -39,7 +39,7 @@ import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricNames;
@@ -49,7 +49,7 @@ import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 697b6ce..1fb0580 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -34,7 +34,7 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.partition.Partition;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
index f301bbf..793768a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.requests;
 
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 5c2a01a..5090250 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -18,24 +18,27 @@
 package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.graph.DefaultVertexResolver;
-import org.apache.giraph.partition.DefaultPartitionContext;
-import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.partition.DefaultPartitionContext;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -50,6 +53,7 @@ import java.util.List;
  * @param <E> Edge class
  * @param <M> Message class
  */
+@SuppressWarnings("unchecked")
 public class GiraphClasses<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements GiraphConstants {
@@ -63,6 +67,8 @@ public class GiraphClasses<I extends WritableComparable,
   protected Class<E> edgeValueClass;
   /** Message value class - cached for fast access */
   protected Class<M> messageValueClass;
+  /** Vertex edges class - cached for fast access */
+  protected Class<? extends VertexEdges<I, E>> vertexEdgesClass;
 
   /** Graph partitioner factory class - cached for fast access */
   protected Class<? extends GraphPartitionerFactory<I, V, E, M>>
@@ -96,9 +102,25 @@ public class GiraphClasses<I extends WritableComparable,
   protected Class<? extends Partition<I, V, E, M>> partitionClass;
 
   /**
-   * Empty constructor. Initialize with classes all null.
+   * Empty constructor. Initialize with default classes or null.
    */
-  public GiraphClasses() { }
+  public GiraphClasses() {
+    // Note: the cast to Object is required in order for javac to accept the
+    // downcast.
+    vertexEdgesClass = (Class<? extends VertexEdges<I, E>>) (Object)
+        ByteArrayEdges.class;
+    graphPartitionerFactoryClass =
+        (Class<? extends GraphPartitionerFactory<I, V, E, M>>) (Object)
+            HashPartitionerFactory.class;
+    aggregatorWriterClass = TextAggregatorWriter.class;
+    vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+        (Object) DefaultVertexResolver.class;
+    partitionContextClass = DefaultPartitionContext.class;
+    workerContextClass = DefaultWorkerContext.class;
+    masterComputeClass = DefaultMasterCompute.class;
+    partitionClass = (Class<? extends Partition<I, V, E, M>>) (Object)
+        SimplePartition.class;
+  }
 
   /**
    * Contructor that reads classes from a Configuration object.
@@ -118,13 +140,15 @@ public class GiraphClasses<I extends WritableComparable,
     // set pre-validated generic parameter types into Configuration
     vertexClass = (Class<? extends Vertex<I, V, E, M>>)
         conf.getClass(VERTEX_CLASS, null, Vertex.class);
-    List<Class<?>> classList =
-        org.apache.giraph.utils.ReflectionUtils.<Vertex>getTypeArguments(
-            Vertex.class, vertexClass);
+    List<Class<?>> classList = ReflectionUtils.getTypeArguments(Vertex.class,
+        vertexClass);
     vertexIdClass = (Class<I>) classList.get(0);
     vertexValueClass = (Class<V>) classList.get(1);
     edgeValueClass = (Class<E>) classList.get(2);
     messageValueClass = (Class<M>) classList.get(3);
+    vertexEdgesClass = (Class<? extends VertexEdges<I, E>>)
+        conf.getClass(VERTEX_EDGES_CLASS, ByteArrayEdges.class,
+            VertexEdges.class);
 
     graphPartitionerFactoryClass =
         (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
@@ -206,12 +230,12 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Check if we GraphPartitionerFactory is set
+   * Get Vertex edges class
    *
-   * @return true if GraphPartitionerFactory is set
+   * @return Vertex edges class.
    */
-  public boolean hasGraphPartitionerFactoryClass() {
-    return graphPartitionerFactoryClass != null;
+  public Class<? extends VertexEdges<I, E>> getVertexEdgesClass() {
+    return vertexEdgesClass;
   }
 
   /**
@@ -463,6 +487,19 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Set VertexEdges class held
+   *
+   * @param vertexEdgesClass Vertex edges class to set
+   * @return this
+   */
+  public GiraphClasses setVertexEdgesClass(
+      Class<? extends VertexEdges> vertexEdgesClass) {
+    this.vertexEdgesClass =
+        (Class<? extends VertexEdges<I, E>>) vertexEdgesClass;
+    return this;
+  }
+
+  /**
    * Set GraphPartitionerFactory class held
    *
    * @param klass GraphPartitionerFactory to set

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 3ea8d3b..6886d58 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -31,7 +32,7 @@ import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.conf.Configuration;
@@ -74,6 +75,16 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Set the vertex edges class
+   *
+   * @param vertexEdgesClass Determines the way edges are stored
+   */
+  public final void setVertexEdgesClass(
+      Class<? extends VertexEdges> vertexEdgesClass) {
+    setClass(VERTEX_EDGES_CLASS, vertexEdgesClass, VertexEdges.class);
+  }
+
+  /**
    * Set the vertex input format class (required)
    *
    * @param vertexInputFormatClass Determines how graph is input

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index fcdd57b..ad9073d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -24,6 +24,8 @@ package org.apache.giraph.conf;
 public interface GiraphConstants {
   /** Vertex class - required */
   String VERTEX_CLASS = "giraph.vertexClass";
+  /** Vertex edges class - required */
+  String VERTEX_EDGES_CLASS = "giraph.vertexEdgesClass";
 
   /** Class for Master - optional */
   String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index e6c4cc6..8457b8b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,10 +20,7 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
 import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.graph.MutableEdge;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -32,10 +29,8 @@ import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
-import org.apache.giraph.partition.MasterGraphPartitioner;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.utils.ExtendedByteArrayDataInput;
 import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
 import org.apache.giraph.utils.ExtendedDataInput;
@@ -43,7 +38,11 @@ import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.conf.Configuration;
@@ -62,12 +61,10 @@ import org.apache.hadoop.util.Progressable;
  * @param <E> Edge data
  * @param <M> Message data
  */
+@SuppressWarnings("unchecked")
 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends GiraphConfiguration {
-  /** Master graph partitioner - cached for fast access */
-  protected final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
-
   /** Holder for all the classes */
   private final GiraphClasses classes;
 
@@ -85,11 +82,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    */
   public ImmutableClassesGiraphConfiguration(Configuration conf) {
     super(conf);
-
     classes = new GiraphClasses(conf);
-    masterGraphPartitioner = (MasterGraphPartitioner<I, V, E, M>)
-        createGraphPartitioner().createMasterGraphPartitioner();
-
     useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION,
         USE_UNSAFE_SERIALIZATION_DEFAULT);
   }
@@ -127,24 +120,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Create a user graph partitioner partition stats class
-   *
-   * @return Instantiated user graph partition stats class
-   */
-  public PartitionStats createGraphPartitionStats() {
-    return getMasterGraphPartitioner().createPartitionStats();
-  }
-
-  /**
-   * Get the cached MasterGraphPartitioner.
-   *
-   * @return MasterGraphPartitioner cached in this class.
-   */
-  public MasterGraphPartitioner<I, V, E, M> getMasterGraphPartitioner() {
-    return masterGraphPartitioner;
-  }
-
-  /**
    * Does the job have a {@link VertexInputFormat}?
    *
    * @return True iff a {@link VertexInputFormat} has been specified.
@@ -177,6 +152,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Does the job have a {@link VertexOutputFormat}?
+   *
+   * @return True iff a {@link VertexOutputFormat} has been specified.
+   */
+  public boolean hasVertexOutputFormat() {
+    return classes.hasVertexOutputFormat();
+  }
+
+  /**
    * Get the user's subclassed
    * {@link org.apache.giraph.io.VertexOutputFormat}.
    *
@@ -247,6 +231,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get the user's subclassed {@link Combiner} class.
+   *
+   * @return User's combiner class
+   */
+  public Class<? extends Combiner<I, M>> getCombinerClass() {
+    return classes.getCombinerClass();
+  }
+
+  /**
    * Create a user combiner class
    *
    * @return Instantiated user combiner class
@@ -350,7 +343,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Get the user's subclassed {@link org.apache.giraph.vertex.Vertex}
+   * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
    *
    * @return User's vertex class
    */
@@ -560,6 +553,51 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get the user's subclassed {@link VertexEdges}
+   *
+   * @return User's vertex edges class
+   */
+  public Class<? extends VertexEdges<I, E>> getVertexEdgesClass() {
+    return classes.getVertexEdgesClass();
+  }
+
+  /**
+   * Create a user {@link VertexEdges}
+   *
+   * @return Instantiated user VertexEdges
+   */
+  public VertexEdges<I, E> createVertexEdges() {
+    return ReflectionUtils.newInstance(getVertexEdgesClass(), this);
+  }
+
+  /**
+   * Create a {@link VertexEdges} instance and initialize it with the given
+   * capacity (the number of edges that will be added).
+   *
+   * @param capacity Number of edges that will be added
+   * @return Instantiated VertexEdges
+   */
+  public VertexEdges<I, E> createAndInitializeVertexEdges(int capacity) {
+    VertexEdges<I, E> vertexEdges = createVertexEdges();
+    vertexEdges.initialize(capacity);
+    return vertexEdges;
+  }
+
+  /**
+   * Create a {@link VertexEdges} instance and initialize it with the given
+   * iterable of edges.
+   *
+   * @param edges Iterable of edges to add
+   * @return Instantiated VertexEdges
+   */
+  public VertexEdges<I, E> createAndInitializeVertexEdges(
+      Iterable<Edge<I, E>> edges) {
+    VertexEdges<I, E> vertexEdges = createVertexEdges();
+    vertexEdges.initialize(edges);
+    return vertexEdges;
+  }
+
+  /**
    * Create a partition
    *
    * @param id Partition id

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
new file mode 100644
index 0000000..68d4ec0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * {@link VertexEdges} implementation backed by an {@link ArrayList}.
+ * Parallel edges are allowed.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class ArrayListEdges<I extends WritableComparable, E extends Writable>
+    extends ConfigurableVertexEdges<I, E> {
+  /** List of edges. */
+  private ArrayList<Edge<I, E>> edgeList;
+
+  @Override
+  public void initialize(Iterable<Edge<I, E>> edges) {
+    if (edges != null) {
+      // If the iterable is actually an instance of ArrayList,
+      // we simply copy the reference.
+      // Otherwise we have to add every edge.
+      if (edges instanceof ArrayList) {
+        edgeList = (ArrayList<Edge<I, E>>) edges;
+      } else {
+        edgeList = Lists.newArrayList(edges);
+      }
+    }
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    edgeList = Lists.newArrayListWithCapacity(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    edgeList = Lists.newArrayList();
+  }
+
+  @Override
+  public void add(Edge<I, E> edge) {
+    edgeList.add(edge);
+  }
+
+  @Override
+  public void remove(I targetVertexId) {
+    for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext();) {
+      Edge<I, E> edge = edges.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        edges.remove();
+      }
+    }
+  }
+
+  @Override
+  public int size() {
+    return edgeList.size();
+  }
+
+  @Override
+  public final Iterator<Edge<I, E>> iterator() {
+    return edgeList.iterator();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(edgeList.size());
+    for (Edge<I, E> edge : edgeList) {
+      edge.getTargetVertexId().write(out);
+      edge.getValue().write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEdges = in.readInt();
+    initialize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      Edge<I, E> edge = getConf().createEdge();
+      WritableUtils.readEdge(in, edge);
+      edgeList.add(edge);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
new file mode 100644
index 0000000..be74ad1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * {@link VertexEdges} implementation backed by a byte array.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage,
+ * but edge removals are expensive.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
+    extends ConfigurableVertexEdges<I, E>
+    implements ReuseObjectsVertexEdges<I, E> {
+  /** Serialized edges. */
+  private byte[] serializedEdges;
+  /** Number of bytes used in serializedEdges. */
+  private int serializedEdgesBytesUsed;
+  /** Number of edges. */
+  private int edgeCount;
+
+  @Override
+  public void initialize(Iterable<Edge<I, E>> edges) {
+    ExtendedDataOutput extendedOutputStream =
+        getConf().createExtendedDataOutput();
+    if (edges != null) {
+      for (Edge<I, E> edge : edges) {
+        try {
+          WritableUtils.writeEdge(extendedOutputStream, edge);
+        } catch (IOException e) {
+          throw new IllegalStateException("initialize: Failed to serialize " +
+              edge);
+        }
+        ++edgeCount;
+      }
+    }
+    serializedEdges = extendedOutputStream.getByteArray();
+    serializedEdgesBytesUsed = extendedOutputStream.getPos();
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    // We have no way to know the size in bytes used by a certain
+    // number of edges.
+    initialize();
+  }
+
+  @Override
+  public void initialize() {
+    // No-op: no need to initialize the byte-array if there are no edges,
+    // since add() and iterator() work fine with a null buffer.
+  }
+
+  @Override
+  public void add(Edge<I, E> edge) {
+    ExtendedDataOutput extendedDataOutput =
+        getConf().createExtendedDataOutput(
+            serializedEdges, serializedEdgesBytesUsed);
+    try {
+      WritableUtils.writeEdge(extendedDataOutput, edge);
+    } catch (IOException e) {
+      throw new IllegalStateException("add: Failed to write to the new " +
+          "byte array");
+    }
+    serializedEdges = extendedDataOutput.getByteArray();
+    serializedEdgesBytesUsed = extendedDataOutput.getPos();
+    ++edgeCount;
+  }
+
+  @Override
+  public void remove(I targetVertexId) {
+    // Note that this is very expensive (deserializes all edges).
+    ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
+    List<Integer> foundStartOffsets = new LinkedList<Integer>();
+    List<Integer> foundEndOffsets = new LinkedList<Integer>();
+    int lastStartOffset = 0;
+    while (iterator.hasNext()) {
+      Edge<I, E> edge = iterator.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        foundStartOffsets.add(lastStartOffset);
+        foundEndOffsets.add(iterator.extendedDataInput.getPos());
+        --edgeCount;
+      }
+      lastStartOffset = iterator.extendedDataInput.getPos();
+    }
+    foundStartOffsets.add(serializedEdgesBytesUsed);
+
+    Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
+    Integer foundStartOffset = foundStartOffsetIter.next();
+    for (Integer foundEndOffset : foundEndOffsets) {
+      Integer nextFoundStartOffset = foundStartOffsetIter.next();
+      System.arraycopy(serializedEdges, foundEndOffset,
+          serializedEdges, foundStartOffset,
+          nextFoundStartOffset - foundEndOffset);
+      serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
+      foundStartOffset = nextFoundStartOffset;
+    }
+  }
+
+  @Override
+  public int size() {
+    return edgeCount;
+  }
+
+  /**
+   * Iterator that reuses the same Edge object.
+   */
+  private class ByteArrayEdgeIterator
+      extends UnmodifiableIterator<Edge<I, E>> {
+    /** Input for processing the bytes */
+    private ExtendedDataInput extendedDataInput =
+        getConf().createExtendedDataInput(
+            serializedEdges, 0, serializedEdgesBytesUsed);
+    /** Representative edge object. */
+    private MutableEdge<I, E> representativeEdge =
+        getConf().createMutableEdge();
+
+    @Override
+    public boolean hasNext() {
+      return serializedEdges != null && extendedDataInput.available() > 0;
+    }
+
+    @Override
+    public Edge<I, E> next() {
+      try {
+        WritableUtils.readEdge(extendedDataInput, representativeEdge);
+      } catch (IOException e) {
+        throw new IllegalStateException("next: Failed on pos " +
+            extendedDataInput.getPos() + " edge " + representativeEdge);
+      }
+      return representativeEdge;
+    }
+  }
+
+  @Override
+  public Iterator<Edge<I, E>> iterator() {
+    return new ByteArrayEdgeIterator();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    serializedEdgesBytesUsed = in.readInt();
+    if (serializedEdgesBytesUsed > 0) {
+      // Only create a new buffer if the old one isn't big enough
+      if (serializedEdges == null ||
+          serializedEdgesBytesUsed > serializedEdges.length) {
+        serializedEdges = new byte[serializedEdgesBytesUsed];
+      }
+      in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
+    }
+    edgeCount = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(serializedEdgesBytesUsed);
+    if (serializedEdgesBytesUsed > 0) {
+      out.write(serializedEdges, 0, serializedEdgesBytesUsed);
+    }
+    out.writeInt(edgeCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableVertexEdges.java
new file mode 100644
index 0000000..faa12eb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableVertexEdges.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Abstract base class for {@link VertexEdges} implementations that require
+ * access to the configuration.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+@SuppressWarnings("unchecked")
+public abstract class ConfigurableVertexEdges<I extends WritableComparable,
+    E extends Writable>
+    implements VertexEdges<I, E>, ImmutableClassesGiraphConfigurable {
+  /** Configuration. */
+  private ImmutableClassesGiraphConfiguration<I, ?, E, ?> configuration;
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    configuration = conf;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, ?, E, ?> getConf() {
+    return configuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
new file mode 100644
index 0000000..461bff3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A complete edge, the target vertex and the edge value.  Can only be one
+ * edge with a destination vertex id per edge map.
+ *
+ * @param <I> Vertex index
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class DefaultEdge<I extends WritableComparable, E extends Writable>
+    implements MutableEdge<I, E> {
+  /** Target vertex id */
+  private I targetVertexId = null;
+  /** Edge value */
+  private E value = null;
+
+  /**
+   * Constructor for reflection
+   */
+  public DefaultEdge() { }
+
+  /**
+   * Create the edge with final values. Don't call, use EdgeFactory instead.
+   *
+   * @param targetVertexId Desination vertex id.
+   * @param value Value of the edge.
+   */
+  DefaultEdge(I targetVertexId, E value) {
+    this.targetVertexId = targetVertexId;
+    this.value = value;
+  }
+
+  @Override
+  public I getTargetVertexId() {
+    return targetVertexId;
+  }
+
+  @Override
+  public E getValue() {
+    return value;
+  }
+
+  @Override
+  public void setTargetVertexId(I targetVertexId) {
+    this.targetVertexId = targetVertexId;
+  }
+
+  @Override
+  public void setValue(E value) {
+    this.value = value;
+  }
+
+  @Override
+  public String toString() {
+    return "(targetVertexId = " + targetVertexId + ", " +
+        "value = " + value + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/Edge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/Edge.java b/giraph-core/src/main/java/org/apache/giraph/edge/Edge.java
new file mode 100644
index 0000000..4649da1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/Edge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A complete edge, the target vertex and the edge value.  Can only be one
+ * edge with a destination vertex id per edge map.
+ *
+ * @param <I> Vertex index
+ * @param <E> Edge value
+ */
+public interface Edge<I extends WritableComparable, E extends Writable> {
+  /**
+   * Get the target vertex index of this edge
+   *
+   * @return Target vertex index of this edge
+   */
+  I getTargetVertexId();
+
+  /**
+   * Get the edge value of the edge
+   *
+   * @return Edge value of this edge
+   */
+  E getValue();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
new file mode 100644
index 0000000..3599207
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory for creating Edges
+ */
+public class EdgeFactory {
+  /** Do not construct */
+  private EdgeFactory() { }
+
+  /**
+   * Create an edge pointing to a given ID with a value
+   *
+   * @param id target ID
+   * @param value edge value
+   * @param <I> Vertex ID type
+   * @param <E> Edge Value type
+   * @return Edge pointing to ID with value
+   */
+  public static <I extends WritableComparable,
+                 E extends Writable>
+  Edge<I, E> create(I id, E value) {
+    return createMutable(id, value);
+  }
+
+  /**
+   * Create an edge pointing to a given ID without a value
+   *
+   * @param id target ID
+   * @param <I> Vertex ID type
+   * @return Edge pointing to ID without a value
+   */
+  public static <I extends WritableComparable>
+  Edge<I, NullWritable> create(I id) {
+    return createMutable(id);
+  }
+
+  /**
+   * Create a mutable edge pointing to a given ID with a value
+   *
+   * @param id target ID
+   * @param value edge value
+   * @param <I> Vertex ID type
+   * @param <E> Edge Value type
+   * @return Edge pointing to ID with value
+   */
+  public static <I extends WritableComparable,
+                 E extends Writable>
+  MutableEdge<I, E> createMutable(I id, E value) {
+    if (value instanceof NullWritable) {
+      return (MutableEdge<I, E>) createMutable(id);
+    } else {
+      return new DefaultEdge<I, E>(id, value);
+    }
+  }
+
+  /**
+   * Create a mutable edge pointing to a given ID with a value
+   *
+   * @param id target ID
+   * @param <I> Vertex ID type
+   * @return Edge pointing to ID with value
+   */
+  public static <I extends WritableComparable>
+  MutableEdge<I, NullWritable> createMutable(I id) {
+    return new EdgeNoValue<I>(id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
new file mode 100644
index 0000000..dd22aec
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An edge that has no value.
+ *
+ * @param <I> Vertex ID
+ */
+public class EdgeNoValue<I extends WritableComparable>
+    implements MutableEdge<I, NullWritable> {
+  /** Target vertex id */
+  private I targetVertexId = null;
+
+  /** Empty constructor */
+  EdgeNoValue() { }
+
+  /**
+   * Constructor with target vertex ID. Don't call, use EdgeFactory instead.
+   *
+   * @param targetVertexId vertex ID
+   */
+  EdgeNoValue(I targetVertexId) {
+    this.targetVertexId = targetVertexId;
+  }
+
+  @Override
+  public void setTargetVertexId(I targetVertexId) {
+    this.targetVertexId = targetVertexId;
+  }
+
+  @Override
+  public void setValue(NullWritable value) {
+    // do nothing
+  }
+
+  @Override
+  public I getTargetVertexId() {
+    return targetVertexId;
+  }
+
+  @Override
+  public NullWritable getValue() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public String toString() {
+    return "(targetVertexId = " + targetVertexId + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
new file mode 100644
index 0000000..64569bb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.MapMaker;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Collects incoming edges for vertices owned by this worker.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(EdgeStore.class);
+  /** Service worker. */
+  private CentralizedServiceWorker<I, V, E, M> service;
+  /** Giraph configuration. */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** Progressable to report progress. */
+  private Progressable progressable;
+  /** Map used to temporarily store incoming edges. */
+  private ConcurrentMap<Integer,
+      ConcurrentMap<I, VertexEdges<I, E>>> transientEdges;
+  /**
+   * Whether the chosen {@link VertexEdges} implementation allows for Edge
+   * reuse.
+   */
+  private boolean reuseEdgeObjects;
+
+  /**
+   * Constructor.
+   *
+   * @param service Service worker
+   * @param configuration Configuration
+   * @param progressable Progressable
+   */
+  public EdgeStore(
+      CentralizedServiceWorker<I, V, E, M> service,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      Progressable progressable) {
+    this.service = service;
+    this.configuration = configuration;
+    this.progressable = progressable;
+    transientEdges = new MapMaker().concurrencyLevel(
+        configuration.getNettyServerExecutionConcurrency()).makeMap();
+    reuseEdgeObjects = ReuseObjectsVertexEdges.class.isAssignableFrom(
+        configuration.getVertexEdgesClass());
+  }
+
+  /**
+   * Add edges belonging to a given partition on this worker.
+   * Note: This method is thread-safe.
+   *
+   * @param partitionId Partition id for the incoming edges.
+   * @param edges Incoming edges
+   */
+  public void addPartitionEdges(
+      int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
+    ConcurrentMap<I, VertexEdges<I, E>> partitionEdges =
+        transientEdges.get(partitionId);
+    if (partitionEdges == null) {
+      ConcurrentMap<I, VertexEdges<I, E>> newPartitionEdges =
+          new MapMaker().concurrencyLevel(
+              configuration.getNettyServerExecutionConcurrency()).makeMap();
+      partitionEdges = transientEdges.putIfAbsent(partitionId,
+          newPartitionEdges);
+      if (partitionEdges == null) {
+        partitionEdges = newPartitionEdges;
+      }
+    }
+    ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
+        edges.getVertexIdEdgeIterator();
+    while (vertexIdEdgeIterator.hasNext()) {
+      vertexIdEdgeIterator.next();
+      I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+      Edge<I, E> edge = reuseEdgeObjects ?
+          vertexIdEdgeIterator.getCurrentEdge() :
+          vertexIdEdgeIterator.releaseCurrentEdge();
+      VertexEdges<I, E> vertexEdges = partitionEdges.get(vertexId);
+      if (vertexEdges == null) {
+        VertexEdges<I, E> newVertexEdges = configuration.createVertexEdges();
+        vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges);
+        if (vertexEdges == null) {
+          vertexEdges = newVertexEdges;
+          // Only initialize the new vertex once we are sure it's going to be
+          // used.
+          vertexEdges.initialize();
+          // Since we had to use the vertex id as a new key in the map,
+          // we need to release the object.
+          vertexIdEdgeIterator.releaseCurrentVertexId();
+        }
+      }
+      synchronized (vertexEdges) {
+        vertexEdges.add(edge);
+      }
+    }
+  }
+
+  /**
+   * Move all edges from temporary storage to their source vertices.
+   * Note: this method is not thread-safe.
+   */
+  public void moveEdgesToVertices() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
+    }
+    for (Map.Entry<Integer, ConcurrentMap<I,
+        VertexEdges<I, E>>> partitionEdges : transientEdges.entrySet()) {
+      Partition<I, V, E, M> partition =
+          service.getPartitionStore().getPartition(partitionEdges.getKey());
+      for (I vertexId : partitionEdges.getValue().keySet()) {
+        VertexEdges<I, E> vertexEdges =
+            partitionEdges.getValue().remove(vertexId);
+        Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+        // If the source vertex doesn't exist, create it. Otherwise,
+        // just set the edges.
+        if (vertex == null) {
+          vertex = configuration.createVertex();
+          vertex.initialize(vertexId, configuration.createVertexValue(),
+              vertexEdges);
+          partition.putVertex(vertex);
+        } else {
+          vertex.setEdges(vertexEdges);
+          // Some Partition implementations (e.g. ByteArrayPartition) require
+          // us to put back the vertex after modifying it.
+          partition.saveVertex(vertex);
+        }
+        progressable.progress();
+      }
+      // Some PartitionStore implementations (e.g. DiskBackedPartitionStore)
+      // require us to put back the partition after modifying it.
+      service.getPartitionStore().putPartition(partition);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
+          "vertices.");
+    }
+    transientEdges.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
new file mode 100644
index 0000000..1aa9a46
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link VertexEdges} implementation backed by a {@link HashMap}.
+ * Parallel edges are not allowed.
+ * Note: this implementation is optimized for fast random access and mutations,
+ * but uses more space.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class HashMapEdges<I extends WritableComparable, E extends Writable>
+    extends ConfigurableVertexEdges<I, E>
+    implements StrictRandomAccessVertexEdges<I, E> {
+  /** Map from target vertex id to edge value. */
+  private HashMap<I, E> edgeMap;
+
+  @Override
+  public void initialize(Iterable<Edge<I, E>> edges) {
+    if (edges != null) {
+      // If the iterable is actually a collection, we can cheaply get the
+      // size and initialize the hash-map with the expected capacity.
+      if (edges instanceof Collection) {
+        initialize(((Collection<Edge<I, E>>) edges).size());
+      } else {
+        initialize();
+      }
+      for (Edge<I, E> edge : edges) {
+        add(edge);
+      }
+    }
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    edgeMap = Maps.newHashMapWithExpectedSize(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    edgeMap = Maps.newHashMap();
+  }
+
+  @Override
+  public void add(Edge<I, E> edge) {
+    edgeMap.put(edge.getTargetVertexId(), edge.getValue());
+  }
+
+  @Override
+  public void remove(I targetVertexId) {
+    edgeMap.remove(targetVertexId);
+  }
+
+  @Override
+  public E getEdgeValue(I targetVertexId) {
+    return edgeMap.get(targetVertexId);
+  }
+
+  @Override
+  public int size() {
+    return edgeMap.size();
+  }
+
+  @Override
+  public Iterator<Edge<I, E>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<I, E>>() {
+      /** Wrapped map iterator. */
+      private Iterator<Map.Entry<I, E>> mapIterator =
+          edgeMap.entrySet().iterator();
+      /** Representative edge object. */
+      private MutableEdge<I, E> representativeEdge =
+          getConf().createMutableEdge();
+
+      @Override
+      public boolean hasNext() {
+        return mapIterator.hasNext();
+      }
+
+      @Override
+      public Edge<I, E> next() {
+        Map.Entry<I, E> nextEntry = mapIterator.next();
+        representativeEdge.setTargetVertexId(nextEntry.getKey());
+        representativeEdge.setValue(nextEntry.getValue());
+        return representativeEdge;
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(edgeMap.size());
+    for (Map.Entry<I, E> entry : edgeMap.entrySet()) {
+      entry.getKey().write(out);
+      entry.getValue().write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEdges = in.readInt();
+    initialize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = getConf().createVertexId();
+      targetVertexId.readFields(in);
+      E edgeValue = getConf().createEdgeValue();
+      edgeValue.readFields(in);
+      edgeMap.put(targetVertexId, edgeValue);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
new file mode 100644
index 0000000..143d7a4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link VertexEdges} implementation backed by an {@link ArrayListMultimap}.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for fast mutations,
+ * but uses more space.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class HashMultimapEdges<I extends WritableComparable, E extends Writable>
+    extends ConfigurableVertexEdges<I, E>
+    implements MultiRandomAccessVertexEdges<I, E> {
+  /** Multimap from target vertex id to edge values. */
+  private ArrayListMultimap<I, E> edgeMultimap;
+
+  @Override
+  public void initialize(Iterable<Edge<I, E>> edges) {
+    // If the iterable is actually a collection, we can cheaply get the
+    // size and initialize the hash-multimap with the expected capacity.
+    if (edges instanceof Collection) {
+      initialize(((Collection<Edge<I, E>>) edges).size());
+    } else {
+      initialize();
+    }
+    for (Edge<I, E> edge : edges) {
+      add(edge);
+    }
+  }
+
+  /**
+   * Additional initialization method tailored to the underlying multimap
+   * implementation.
+   *
+   * @param expectedNeighbors Expected number of unique neighbors
+   * @param expectedEdgesPerNeighbor Expected number of edges per neighbor
+   */
+  public void initialize(int expectedNeighbors, int expectedEdgesPerNeighbor) {
+    edgeMultimap = ArrayListMultimap.create(expectedNeighbors,
+        expectedEdgesPerNeighbor);
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    // To be conservative in terms of space usage, we assume that the initial
+    // number of values per key is 1.
+    initialize(capacity, 1);
+  }
+
+  @Override
+  public void initialize() {
+    edgeMultimap = ArrayListMultimap.create();
+  }
+
+  @Override
+  public void add(Edge<I, E> edge) {
+    edgeMultimap.put(edge.getTargetVertexId(), edge.getValue());
+  }
+
+  @Override
+  public void remove(I targetVertexId) {
+    edgeMultimap.removeAll(targetVertexId);
+  }
+
+  @Override
+  public Iterable<E> getAllEdgeValues(I targetVertexId) {
+    return edgeMultimap.get(targetVertexId);
+  }
+
+  @Override
+  public int size() {
+    return edgeMultimap.size();
+  }
+
+  @Override
+  public Iterator<Edge<I, E>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<I, E>>() {
+      /** Wrapped map iterator. */
+      private Iterator<Map.Entry<I, E>> mapIterator =
+          edgeMultimap.entries().iterator();
+      /** Representative edge object. */
+      private MutableEdge<I, E> representativeEdge =
+          getConf().createMutableEdge();
+
+      @Override
+      public boolean hasNext() {
+        return mapIterator.hasNext();
+      }
+
+      @Override
+      public Edge<I, E> next() {
+        Map.Entry<I, E> nextEntry = mapIterator.next();
+        representativeEdge.setTargetVertexId(nextEntry.getKey());
+        representativeEdge.setValue(nextEntry.getValue());
+        return representativeEdge;
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // We write both the total number of edges and the number of unique
+    // neighbors.
+    out.writeInt(edgeMultimap.size());
+    out.writeInt(edgeMultimap.keys().size());
+    for (Map.Entry<I, E> edge : edgeMultimap.entries()) {
+      edge.getKey().write(out);
+      edge.getValue().write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // Given the total number of pairs and the number of unique neighbors,
+    // we are able to compute the average number of edges per neighbors.
+    int numEdges = in.readInt();
+    int numNeighbors = in.readInt();
+    initialize(numEdges, numNeighbors == 0 ? 0 : numEdges / numNeighbors);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = getConf().createVertexId();
+      targetVertexId.readFields(in);
+      E edgeValue = getConf().createEdgeValue();
+      edgeValue.readFields(in);
+      edgeMultimap.put(targetVertexId, edgeValue);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
new file mode 100644
index 0000000..9df58a9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.doubles.DoubleIterator;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link VertexEdges} with long ids and double edge
+ * values, backed by dynamic primitive arrays.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage,
+ * but edge removals are expensive.
+ */
+public class LongDoubleArrayEdges
+    extends ConfigurableVertexEdges<LongWritable, DoubleWritable>
+    implements ReuseObjectsVertexEdges<LongWritable, DoubleWritable> {
+  /** Array of target vertex ids. */
+  private LongArrayList neighbors;
+  /** Array of edge values. */
+  private DoubleArrayList edgeValues;
+
+  @Override
+  public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
+    if (edges != null) {
+      // If the iterable is actually a collection, we can cheaply get the
+      // size and initialize the arrays with the expected capacity.
+      if (edges instanceof Collection) {
+        int numEdges =
+            ((Collection<Edge<LongWritable, DoubleWritable>>) edges).size();
+        initialize(numEdges);
+      } else {
+        initialize();
+      }
+      for (Edge<LongWritable, DoubleWritable> edge : edges) {
+        add(edge);
+      }
+    }
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    neighbors = new LongArrayList(capacity);
+    edgeValues = new DoubleArrayList(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    neighbors = new LongArrayList();
+    edgeValues = new DoubleArrayList();
+  }
+
+  @Override
+  public void add(Edge<LongWritable, DoubleWritable> edge) {
+    neighbors.add(edge.getTargetVertexId().get());
+    edgeValues.add(edge.getValue().get());
+  }
+
+  /**
+   * If the backing arrays are more than four times as big as the number of
+   * elements, halve their size.
+   */
+  private void trim() {
+    if (neighbors.elements().length > 4 * neighbors.size()) {
+      neighbors.trim(neighbors.elements().length / 2);
+      edgeValues.trim(neighbors.elements().length / 2);
+    }
+  }
+
+  /**
+   * Remove edge at position i.
+   *
+   * @param i Position of edge to be removed
+   */
+  private void remove(int i) {
+    // The order of the edges is irrelevant, so we can simply replace
+    // the deleted edge with the rightmost element, thus achieving constant
+    // time.
+    if (i == neighbors.size() - 1) {
+      neighbors.popLong();
+      edgeValues.popDouble();
+    } else {
+      neighbors.set(i, neighbors.popLong());
+      edgeValues.set(i, edgeValues.popDouble());
+    }
+  }
+
+  @Override
+  public void remove(LongWritable targetVertexId) {
+    // Thanks to the constant-time implementation of remove(int),
+    // we can remove all matching edges in linear time.
+    for (int i = neighbors.size() - 1; i >= 0; --i) {
+      if (neighbors.get(i) == targetVertexId.get()) {
+        remove(i);
+      }
+    }
+    trim();
+  }
+
+  @Override
+  public int size() {
+    return neighbors.size();
+  }
+
+  @Override
+  public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<LongWritable, DoubleWritable>>() {
+      /** Wrapped neighbors iterator. */
+      private LongIterator neighborsIt = neighbors.iterator();
+      /** Wrapped edge values iterator. */
+      private DoubleIterator edgeValuesIt = edgeValues.iterator();
+      /** Representative edge object. */
+      private Edge<LongWritable, DoubleWritable> representativeEdge =
+          getConf().createEdge();
+
+      @Override
+      public boolean hasNext() {
+        return neighborsIt.hasNext();
+      }
+
+      @Override
+      public Edge<LongWritable, DoubleWritable> next() {
+        representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
+        representativeEdge.getValue().set(edgeValuesIt.nextDouble());
+        return representativeEdge;
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(neighbors.size());
+    LongIterator neighborsIt = neighbors.iterator();
+    DoubleIterator edgeValuesIt = edgeValues.iterator();
+    while (neighborsIt.hasNext()) {
+      out.writeLong(neighborsIt.nextLong());
+      out.writeDouble(edgeValuesIt.nextDouble());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEdges = in.readInt();
+    initialize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      neighbors.add(in.readLong());
+      edgeValues.add(in.readDouble());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
new file mode 100644
index 0000000..6d17b4b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import it.unimi.dsi.fastutil.longs.Long2DoubleMap;
+import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * {@link VertexEdges} implementation with long ids and double edge values,
+ * backed by a {@link Long2DoubleOpenHashMap}.
+ * Parallel edges are not allowed.
+ * Note: this implementation is optimized for fast random access and mutations,
+ * and uses less space than a generic {@link HashMapEdges} (but more than
+ * {@link LongDoubleArrayEdges}.
+ */
+public class LongDoubleHashMapEdges
+    extends ConfigurableVertexEdges<LongWritable, DoubleWritable>
+    implements StrictRandomAccessVertexEdges<LongWritable, DoubleWritable>,
+    ReuseObjectsVertexEdges<LongWritable, DoubleWritable> {
+  /** Hash map from target vertex id to edge value. */
+  private Long2DoubleOpenHashMap edgeMap;
+  /** Representative edge value object, used by getEdgeValue(). */
+  private DoubleWritable representativeEdgeValue;
+
+  @Override
+  public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
+    if (edges != null) {
+      // If the iterable is actually a collection, we can cheaply get the
+      // size and initialize the hash-map with the expected capacity.
+      if (edges instanceof Collection) {
+        initialize(
+            ((Collection<Edge<LongWritable, DoubleWritable>>) edges).size());
+      } else {
+        initialize();
+      }
+      for (Edge<LongWritable, DoubleWritable> edge : edges) {
+        add(edge);
+      }
+    }
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    edgeMap = new Long2DoubleOpenHashMap(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    edgeMap = new Long2DoubleOpenHashMap();
+  }
+
+  @Override
+  public void add(Edge<LongWritable, DoubleWritable> edge) {
+    edgeMap.put(edge.getTargetVertexId().get(), edge.getValue().get());
+  }
+
+  @Override
+  public void remove(LongWritable targetVertexId) {
+    edgeMap.remove(targetVertexId.get());
+  }
+
+  @Override
+  public DoubleWritable getEdgeValue(LongWritable targetVertexId) {
+    if (!edgeMap.containsKey(targetVertexId.get())) {
+      return null;
+    }
+    if (representativeEdgeValue == null) {
+      representativeEdgeValue = getConf().createEdgeValue();
+    }
+    representativeEdgeValue.set(edgeMap.get(targetVertexId.get()));
+    return representativeEdgeValue;
+  }
+
+  @Override
+  public int size() {
+    return edgeMap.size();
+  }
+
+  @Override
+  public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<LongWritable, DoubleWritable>>() {
+      /** Wrapped map iterator. */
+      private ObjectIterator<Long2DoubleMap.Entry> mapIterator =
+          edgeMap.long2DoubleEntrySet().fastIterator();
+      /** Representative edge object. */
+      private MutableEdge<LongWritable, DoubleWritable> representativeEdge =
+          getConf().createMutableEdge();
+
+      @Override
+      public boolean hasNext() {
+        return mapIterator.hasNext();
+      }
+
+      @Override
+      public Edge<LongWritable, DoubleWritable> next() {
+        Long2DoubleMap.Entry nextEntry = mapIterator.next();
+        representativeEdge.getTargetVertexId().set(nextEntry.getLongKey());
+        representativeEdge.getValue().set(nextEntry.getDoubleValue());
+        return representativeEdge;
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(edgeMap.size());
+    for (Long2DoubleMap.Entry entry : edgeMap.long2DoubleEntrySet()) {
+      out.writeLong(entry.getLongKey());
+      out.writeDouble(entry.getDoubleValue());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEdges = in.readInt();
+    initialize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      edgeMap.put(in.readLong(), in.readDouble());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
new file mode 100644
index 0000000..a3b869a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link VertexEdges} with long ids and null edge
+ * values, backed by a dynamic primitive array.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage,
+ * but random access and edge removals are expensive.
+ */
+public class LongNullArrayEdges
+    extends ConfigurableVertexEdges<LongWritable, NullWritable>
+    implements ReuseObjectsVertexEdges<LongWritable, NullWritable> {
+  /** Array of target vertex ids. */
+  private LongArrayList neighbors;
+
+  @Override
+  public void initialize(Iterable<Edge<LongWritable, NullWritable>> edges) {
+    if (edges != null) {
+      // If the iterable is actually a collection, we can cheaply get the
+      // size and initialize the arrays with the expected capacity.
+      if (edges instanceof Collection) {
+        int numEdges =
+            ((Collection<Edge<LongWritable, NullWritable>>) edges).size();
+        initialize(numEdges);
+      } else {
+        initialize();
+      }
+      for (Edge<LongWritable, NullWritable> edge : edges) {
+        add(edge);
+      }
+    }
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    neighbors = new LongArrayList(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    neighbors = new LongArrayList();
+  }
+
+  @Override
+  public void add(Edge<LongWritable, NullWritable> edge) {
+    neighbors.add(edge.getTargetVertexId().get());
+  }
+
+  /**
+   * If the backing array is more than four times as big as the number of
+   * elements, halve its size.
+   */
+  private void trim() {
+    if (neighbors.elements().length > 4 * neighbors.size()) {
+      neighbors.trim(neighbors.elements().length / 2);
+    }
+  }
+
+  /**
+   * Remove edge at position i.
+   *
+   * @param i Position of edge to be removed
+   */
+  private void remove(int i) {
+    // The order of the edges is irrelevant, so we can simply replace
+    // the deleted edge with the rightmost element, thus achieving constant
+    // time.
+    if (i == neighbors.size() - 1) {
+      neighbors.popLong();
+    } else {
+      neighbors.set(i, neighbors.popLong());
+    }
+  }
+
+  @Override
+  public void remove(LongWritable targetVertexId) {
+    // Thanks to the constant-time implementation of remove(int),
+    // we can remove all matching edges in linear time.
+    for (int i = neighbors.size() - 1; i >= 0; --i) {
+      if (neighbors.get(i) == targetVertexId.get()) {
+        remove(i);
+      }
+    }
+    trim();
+  }
+
+  @Override
+  public int size() {
+    return neighbors.size();
+  }
+
+  @Override
+  public Iterator<Edge<LongWritable, NullWritable>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<LongWritable, NullWritable>>() {
+      /** Wrapped neighbors iterator. */
+      private LongIterator neighborsIt = neighbors.iterator();
+      /** Representative edge object. */
+      private Edge<LongWritable, NullWritable> representativeEdge =
+          getConf().createEdge();
+
+      @Override
+      public boolean hasNext() {
+        return neighborsIt.hasNext();
+      }
+
+      @Override
+      public Edge<LongWritable, NullWritable> next() {
+        representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
+        return representativeEdge;
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(neighbors.size());
+    LongIterator neighborsIt = neighbors.iterator();
+    while (neighborsIt.hasNext()) {
+      out.writeLong(neighborsIt.nextLong());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEdges = in.readInt();
+    initialize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      neighbors.add(in.readLong());
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
new file mode 100644
index 0000000..70e69c4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.UnmodifiableIterator;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * {@link VertexEdges} implementation with long ids and null edge values,
+ * backed by a {@link LongOpenHashSet}.
+ * Parallel edges are not allowed.
+ * Note: this implementation is optimized for fast random access and mutations,
+ * and uses less space than a generic {@link HashMapEdges} (but more than
+ * {@link LongNullArrayEdges}.
+ */
+public class LongNullHashSetEdges
+    extends ConfigurableVertexEdges<LongWritable, NullWritable>
+    implements StrictRandomAccessVertexEdges<LongWritable, NullWritable>,
+    ReuseObjectsVertexEdges<LongWritable, NullWritable> {
+  /** Hash set of target vertex ids. */
+  private LongOpenHashSet neighbors;
+
+  @Override
+  public void initialize(Iterable<Edge<LongWritable, NullWritable>> edges) {
+    if (edges != null) {
+      // If the iterable is actually a collection, we can cheaply get the
+      // size and initialize the hash-map with the expected capacity.
+      if (edges instanceof Collection) {
+        initialize(
+            ((Collection<Edge<LongWritable, NullWritable>>) edges).size());
+      } else {
+        initialize();
+      }
+      for (Edge<LongWritable, NullWritable> edge : edges) {
+        add(edge);
+      }
+    }
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    neighbors = new LongOpenHashSet(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    neighbors = new LongOpenHashSet();
+  }
+
+  @Override
+  public void add(Edge<LongWritable, NullWritable> edge) {
+    neighbors.add(edge.getTargetVertexId().get());
+  }
+
+  @Override
+  public void remove(LongWritable targetVertexId) {
+    neighbors.remove(targetVertexId.get());
+  }
+
+  @Override
+  public NullWritable getEdgeValue(LongWritable targetVertexId) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public int size() {
+    return neighbors.size();
+  }
+
+  @Override
+  public Iterator<Edge<LongWritable, NullWritable>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<LongWritable, NullWritable>>() {
+      /** Wrapped neighbors iterator. */
+      private LongIterator neighborsIt = neighbors.iterator();
+      /** Representative edge object. */
+      private MutableEdge<LongWritable, NullWritable> representativeEdge =
+          getConf().createMutableEdge();
+
+      @Override
+      public boolean hasNext() {
+        return neighborsIt.hasNext();
+      }
+
+      @Override
+      public Edge<LongWritable, NullWritable> next() {
+        representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
+        return representativeEdge;
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(neighbors.size());
+    LongIterator neighborsIt = neighbors.iterator();
+    while (neighborsIt.hasNext()) {
+      out.writeLong(neighborsIt.nextLong());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEdges = in.readInt();
+    initialize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      neighbors.add(in.readLong());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/MultiRandomAccessVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MultiRandomAccessVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/MultiRandomAccessVertexEdges.java
new file mode 100644
index 0000000..9f8658e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MultiRandomAccessVertexEdges.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for {@link VertexEdges} implementations that provide efficient
+ * random access to the edges given the target vertex id.
+ * This version is for multigraphs (i.e. there can be parallel edges).
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface MultiRandomAccessVertexEdges<I extends WritableComparable,
+    E extends Writable> extends VertexEdges<I, E> {
+  /**
+   * Return an iterable over the edge values for a given target vertex id.
+   *
+   * @param targetVertexId Target vertex id
+   * @return Iterable of edge values
+   */
+  Iterable<E> getAllEdgeValues(I targetVertexId);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
new file mode 100644
index 0000000..bf00b4f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A complete edge, the target vertex and the edge value.  Can only be one
+ * edge with a destination vertex id per edge map. This edge can be mutated,
+ * that is you can set it's target vertex ID and edge value.
+ *
+ * @param <I> Vertex index
+ * @param <E> Edge value
+ */
+public interface MutableEdge<I extends WritableComparable, E extends Writable>
+    extends Edge<I, E> {
+  /**
+   * Set the destination vertex index of this edge.
+   *
+   * @param targetVertexId new destination vertex
+   */
+  void setTargetVertexId(I targetVertexId);
+
+  /**
+   * Set the value for this edge.
+   *
+   * @param value new edge value
+   */
+  void setValue(E value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/ReuseObjectsVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ReuseObjectsVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ReuseObjectsVertexEdges.java
new file mode 100644
index 0000000..7704baf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ReuseObjectsVertexEdges.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Empty interface to characterize {@link VertexEdges} implementations that
+ * don't keep references to the Edge (or id and value) objects they are passed.
+ * The Giraph infrastructure can exploit this characteristic by reusing Edge
+ * objects.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface ReuseObjectsVertexEdges<I extends WritableComparable,
+    E extends Writable> extends VertexEdges<I, E> { }


Mime
View raw message