giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject [3/4] Everything compiles. All tests should run. Next step is to add a test for the vertex combiner. Should have fixed. Fixed one bug for byte array partition. Fixed another bug for too small of a message buffer. Rebased. Rebased. Passes tests. Need to
Date Wed, 09 Oct 2013 06:35:19 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index f97446f..3337621 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -19,13 +19,15 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
 import org.apache.giraph.factories.DefaultComputationFactory;
 import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
@@ -92,11 +94,14 @@ public class GiraphClasses<I extends WritableComparable,
 
   /** Aggregator writer class - cached for fast access */
   protected Class<? extends AggregatorWriter> aggregatorWriterClass;
-  /** Combiner class - cached for fast access */
-  protected Class<? extends Combiner<I, ? extends Writable>> combinerClass;
+  /** Message combiner class - cached for fast access */
+  protected Class<? extends MessageCombiner<I, ? extends Writable>>
+  messageCombinerClass;
 
   /** Vertex resolver class - cached for fast access */
   protected Class<? extends VertexResolver<I, V, E>> vertexResolverClass;
+  /** Vertex value combiner class - cached for fast access */
+  protected Class<? extends VertexValueCombiner<V>> vertexValueCombinerClass;
   /** Worker context class - cached for fast access */
   protected Class<? extends WorkerContext> workerContextClass;
   /** Master compute class - cached for fast access */
@@ -131,6 +136,8 @@ public class GiraphClasses<I extends WritableComparable,
     aggregatorWriterClass = TextAggregatorWriter.class;
     vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
         (Object) DefaultVertexResolver.class;
+    vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
+        (Object) DefaultVertexValueCombiner.class;
     workerContextClass = DefaultWorkerContext.class;
     masterComputeClass = DefaultMasterCompute.class;
     partitionClass = (Class<? extends Partition<I, V, E>>) (Object)
@@ -176,10 +183,13 @@ public class GiraphClasses<I extends WritableComparable,
         EDGE_OUTPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
-    combinerClass = (Class<? extends Combiner<I, ? extends Writable>>)
-        VERTEX_COMBINER_CLASS.get(conf);
+    messageCombinerClass =
+        (Class<? extends MessageCombiner<I, ? extends Writable>>)
+        MESSAGE_COMBINER_CLASS.get(conf);
     vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
         VERTEX_RESOLVER_CLASS.get(conf);
+    vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
+        VERTEX_VALUE_COMBINER_CLASS.get(conf);
     workerContextClass = WORKER_CONTEXT_CLASS.get(conf);
     masterComputeClass =  MASTER_COMPUTE_CLASS.get(conf);
     partitionClass = (Class<? extends Partition<I, V, E>>)
@@ -390,21 +400,22 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Check if Combiner is set
+   * Check if MessageCombiner is set
    *
-   * @return true if Combiner is set
+   * @return true if MessageCombiner is set
    */
-  public boolean hasCombinerClass() {
-    return combinerClass != null;
+  public boolean hasMessageCombinerClass() {
+    return messageCombinerClass != null;
   }
 
   /**
-   * Get Combiner used
+   * Get MessageCombiner used
    *
-   * @return Combiner
+   * @return MessageCombiner
    */
-  public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
-    return combinerClass;
+  public Class<? extends MessageCombiner<I, ? extends Writable>>
+  getMessageCombinerClass() {
+    return messageCombinerClass;
   }
 
   /**
@@ -426,6 +437,15 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Get VertexValueCombiner used
+   *
+   * @return VertexValueCombiner
+   */
+  public Class<? extends VertexValueCombiner<V>> getVertexValueCombinerClass() {
+    return vertexValueCombinerClass;
+  }
+
+  /**
    * Check if WorkerContext is set
    *
    * @return true if WorkerContext is set
@@ -639,14 +659,14 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Set Combiner class used
+   * Set MessageCombiner class used
    *
-   * @param combinerClass Combiner class to set
+   * @param combinerClass MessageCombiner class to set
    * @return this
    */
-  public GiraphClasses setCombinerClass(
-      Class<? extends Combiner<I, ? extends Writable>> combinerClass) {
-    this.combinerClass = combinerClass;
+  public GiraphClasses setMessageCombiner(
+      Class<? extends MessageCombiner<I, ? extends Writable>> combinerClass) {
+    this.messageCombinerClass = combinerClass;
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 15ff861..4dee396 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -19,13 +19,14 @@
 package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
 import org.apache.giraph.factories.ComputationFactory;
+import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
-import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -96,7 +97,7 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Get the user's subclassed {@link org.apache.giraph.graph.Computation}
+   * Get the user's subclassed {@link Computation}
    *
    * @return User's computation class
    */
@@ -467,22 +468,22 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Get the vertex combiner class (optional)
+   * Get the message combiner class (optional)
    *
-   * @return vertexCombinerClass Determines how vertex messages are combined
+   * @return messageCombinerClass Determines how vertex messages are combined
    */
-  public Class<? extends Combiner> getCombinerClass() {
-    return VERTEX_COMBINER_CLASS.get(this);
+  public Class<? extends MessageCombiner> getMessageCombinerClass() {
+    return MESSAGE_COMBINER_CLASS.get(this);
   }
 
   /**
-   * Set the vertex combiner class (optional)
+   * Set the message combiner class (optional)
    *
-   * @param vertexCombinerClass Determines how vertex messages are combined
+   * @param messageCombinerClass Determines how vertex messages are combined
    */
-  public void setCombinerClass(
-      Class<? extends Combiner> vertexCombinerClass) {
-    VERTEX_COMBINER_CLASS.set(this, vertexCombinerClass);
+  public void setMessageCombinerClass(
+      Class<? extends MessageCombiner> messageCombinerClass) {
+    MESSAGE_COMBINER_CLASS.set(this, messageCombinerClass);
   }
 
   /**
@@ -525,6 +526,16 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Set the vertex value combiner class (optional)
+   *
+   * @param vertexValueCombinerClass Determines how vertices are combined
+   */
+  public final void setVertexValueCombinerClass(
+      Class<? extends VertexValueCombiner> vertexValueCombinerClass) {
+    VERTEX_VALUE_COMBINER_CLASS.set(this, vertexValueCombinerClass);
+  }
+
+  /**
    * Set the worker context class (optional)
    *
    * @param workerContextClass Determines what code is executed on a each

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 4dadd29..89fce61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -19,7 +19,7 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
@@ -34,8 +34,10 @@ import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.factories.VertexIdFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.Language;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
@@ -150,15 +152,20 @@ public interface GiraphConstants {
   ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES =
       ClassConfOption.create("giraph.worker.observers", null,
           WorkerObserver.class, "Classes for Worker Observer - optional");
-  /** Vertex combiner class - optional */
-  ClassConfOption<Combiner> VERTEX_COMBINER_CLASS =
-      ClassConfOption.create("giraph.combinerClass", null, Combiner.class,
-          "Vertex combiner class - optional");
+  /** Message combiner class - optional */
+  ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS =
+      ClassConfOption.create("giraph.messageCombinerClass", null,
+          MessageCombiner.class, "Message combiner class - optional");
   /** Vertex resolver class - optional */
   ClassConfOption<VertexResolver> VERTEX_RESOLVER_CLASS =
       ClassConfOption.create("giraph.vertexResolverClass",
           DefaultVertexResolver.class, VertexResolver.class,
           "Vertex resolver class - optional");
+  /** Vertex value combiner class - optional */
+  ClassConfOption<VertexValueCombiner> VERTEX_VALUE_COMBINER_CLASS =
+      ClassConfOption.create("giraph.vertexValueCombinerClass",
+          DefaultVertexValueCombiner.class, VertexValueCombiner.class,
+          "Vertex value combiner class - optional");
 
   /** Which language computation is implemented in */
   EnumConfOption<Language> COMPUTATION_LANGUAGE =
@@ -588,6 +595,20 @@ public interface GiraphConstants {
           "request size is M, and a worker has P partitions, than its " +
           "initial partition buffer size will be (M / P) * (1 + A).");
 
+  /** Maximum size of vertices (in bytes) per peer before flush */
+  IntConfOption MAX_VERTEX_REQUEST_SIZE =
+      new IntConfOption("giraph.vertexRequestSize", 512 * ONE_KB,
+          "Maximum size of vertices (in bytes) per peer before flush");
+
+  /**
+   * Additional size (expressed as a ratio) of each per-partition buffer on
+   * top of the average size for vertices.
+   */
+  FloatConfOption ADDITIONAL_VERTEX_REQUEST_SIZE =
+      new FloatConfOption("giraph.additionalVertexRequestSize", 0.2f,
+          "Additional size (expressed as a ratio) of each per-partition " +
+              "buffer on top of the average size.");
+
   /** Maximum size of edges (in bytes) per peer before flush */
   IntConfOption MAX_EDGE_REQUEST_SIZE =
       new IntConfOption("giraph.edgeRequestSize", 512 * ONE_KB,
@@ -595,7 +616,7 @@ public interface GiraphConstants {
 
   /**
    * Additional size (expressed as a ratio) of each per-partition buffer on
-   * top of the average size.
+   * top of the average size for edges.
    */
   FloatConfOption ADDITIONAL_EDGE_REQUEST_SIZE =
       new FloatConfOption("giraph.additionalEdgeRequestSize", 0.2f,
@@ -665,9 +686,9 @@ public interface GiraphConstants {
   LongConfOption INPUT_SPLIT_MAX_VERTICES =
       new LongConfOption("giraph.InputSplitMaxVertices", -1,
           "To limit outlier vertex input splits from producing too many " +
-          "vertices or to help with testing, the number of vertices loaded " +
-          "from an input split can be limited. By default, everything is " +
-          "loaded.");
+              "vertices or to help with testing, the number of vertices " +
+              "loaded from an input split can be limited. By default, " +
+              "everything is loaded.");
 
   /**
    * To limit outlier vertex input splits from producing too many vertices or
@@ -677,9 +698,9 @@ public interface GiraphConstants {
   LongConfOption INPUT_SPLIT_MAX_EDGES =
       new LongConfOption("giraph.InputSplitMaxEdges", -1,
           "To limit outlier vertex input splits from producing too many " +
-          "vertices or to help with testing, the number of edges loaded " +
-          "from an input split can be limited. By default, everything is " +
-          "loaded.");
+              "vertices or to help with testing, the number of edges loaded " +
+              "from an input split can be limited. By default, everything is " +
+              "loaded.");
 
   /**
    * To minimize network usage when reading input splits,

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 435dfa5..6bb6c00 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.edge.OutEdges;
@@ -34,6 +34,7 @@ import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.DefaultVertex;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
@@ -87,8 +88,7 @@ import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
  */
 @SuppressWarnings("unchecked")
 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends GiraphConfiguration {
+    V extends Writable, E extends Writable> extends GiraphConfiguration {
   /** Holder for all the classes */
   private final GiraphClasses classes;
   /** Value (IVEMM) Factories */
@@ -429,12 +429,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Get the user's subclassed {@link Combiner} class.
+   * Get the user's subclassed
+   * {@link org.apache.giraph.combiner.MessageCombiner} class.
    *
    * @return User's combiner class
    */
-  public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
-    return classes.getCombinerClass();
+  public Class<? extends MessageCombiner<I, ? extends Writable>>
+  getMessageCombinerClass() {
+    return classes.getMessageCombinerClass();
   }
 
   /**
@@ -444,8 +446,9 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * @return Instantiated user combiner class
    */
   @SuppressWarnings("rawtypes")
-  public <M extends Writable> Combiner<I, M> createCombiner() {
-    Class<? extends Combiner<I, M>> klass = classes.getCombinerClass();
+  public <M extends Writable> MessageCombiner<I, M> createMessageCombiner() {
+    Class<? extends MessageCombiner<I, M>> klass =
+        classes.getMessageCombinerClass();
     return ReflectionUtils.newInstance(klass, this);
   }
 
@@ -454,8 +457,29 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    *
    * @return True iff user set a combiner class
    */
-  public boolean useCombiner() {
-    return classes.hasCombinerClass();
+  public boolean useMessageCombiner() {
+    return classes.hasMessageCombinerClass();
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.VertexValueCombiner} class.
+   *
+   * @return User's vertex value combiner class
+   */
+  public Class<? extends VertexValueCombiner<V>>
+  getVertexValueCombinerClass() {
+    return classes.getVertexValueCombinerClass();
+  }
+
+  /**
+   * Create a user vertex value combiner class
+   *
+   * @return Instantiated user vertex value combiner class
+   */
+  @SuppressWarnings("rawtypes")
+  public VertexValueCombiner<V> createVertexValueCombiner() {
+    return ReflectionUtils.newInstance(getVertexValueCombinerClass(), this);
   }
 
   /**
@@ -979,7 +1003,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Update Computation and Combiner class used
+   * Update Computation and MessageCombiner class used
    *
    * @param superstepClasses SuperstepClasses
    */
@@ -999,6 +1023,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
           (Class<? extends Writable>) classList[4];
       classes.setOutgoingMessageValueClass(outgoingMsgValueClass);
     }
-    classes.setCombinerClass(superstepClasses.getCombinerClass());
+    classes.setMessageCombiner(superstepClasses.getMessageCombinerClass());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 23df689..1694d36 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -181,7 +181,7 @@ public class EdgeStore<I extends WritableComparable,
             Integer partitionId;
             while ((partitionId = partitionIdQueue.poll()) != null) {
               Partition<I, V, E> partition =
-                  service.getPartitionStore().getPartition(partitionId);
+                  service.getPartitionStore().getOrCreatePartition(partitionId);
               ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
                   transientEdges.remove(partitionId);
               for (I vertexId : partitionEdges.keySet()) {
@@ -196,7 +196,15 @@ public class EdgeStore<I extends WritableComparable,
                       outEdges);
                   partition.putVertex(vertex);
                 } else {
-                  vertex.setEdges(outEdges);
+                  // A vertex may exist with or without edges initially
+                  // and optimize the case of no initial edges
+                  if (vertex.getNumEdges() == 0) {
+                    vertex.setEdges(outEdges);
+                  } else {
+                    for (Edge<I, E> edge : outEdges) {
+                      vertex.addEdge(edge);
+                    }
+                  }
                   // Some Partition implementations (e.g. ByteArrayPartition)
                   // require us to put back the vertex after modifying it.
                   partition.saveVertex(vertex);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 77d9f5e..1fe1d10 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -153,7 +153,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       }
 
       Partition<I, V, E> partition =
-          serviceWorker.getPartitionStore().getPartition(partitionId);
+          serviceWorker.getPartitionStore().getOrCreatePartition(partitionId);
 
       Computation<I, V, E, M1, M2> computation =
           (Computation<I, V, E, M1, M2>) configuration.createComputation();

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
new file mode 100644
index 0000000..4dc6384
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The default vertex value combining approach is to simply keep the original
+ * value.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class DefaultVertexValueCombiner<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements VertexValueCombiner<V> {
+  @Override
+  public void combine(V originalVertexValue,
+                      V vertexValue) {
+    // Keep the original value, do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
new file mode 100644
index 0000000..7891434
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * When vertex values with the same vertex id are loaded, this
+ * class specifies how to combine their vertex values.  Edges loaded will
+ * be added to the EdgeStore.
+ *
+ * @param <V> Vertex data
+ */
+public interface VertexValueCombiner<V extends Writable> {
+  /**
+   * Combine a vertex with the original vertex
+   * by modifying originalVertex.
+   *
+   * @param originalVertexValue Combine the other vertex into this one
+   * @param vertexValue Combine into the originalVertex.
+   */
+  void combine(V originalVertexValue, V vertexValue);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
new file mode 100644
index 0000000..1af5b73
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: vertex_id vertex_value neighbor1 neighbor2 ...
+ */
+public class IntIntNullTextVertexInputFormat
+    extends
+    TextVertexInputFormat<IntWritable, IntWritable, NullWritable> {
+  /** Separator of the vertex and neighbors */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+    throws IOException {
+    return new IntIntNullVertexReader();
+  }
+
+  /**
+   * Vertex reader associated with
+   * {@link org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat}.
+   */
+  public class IntIntNullVertexReader extends
+    TextVertexReaderFromEachLineProcessed<String[]> {
+    /** Cached vertex id for the current line */
+    private IntWritable id;
+    /** Cached vertex value for the current line */
+    private IntWritable value;
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      id = new IntWritable(Integer.parseInt(tokens[0]));
+      value = new IntWritable(Integer.parseInt(tokens[1]));
+      return tokens;
+    }
+
+    @Override
+    protected IntWritable getId(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected IntWritable getValue(String[] tokens) throws IOException {
+      return value;
+    }
+
+    @Override
+    protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
+        String[] tokens) throws IOException {
+      List<Edge<IntWritable, NullWritable>> edges =
+          Lists.newArrayListWithCapacity(tokens.length - 2);
+      for (int n = 2; n < tokens.length; n++) {
+        edges.add(EdgeFactory.create(
+            new IntWritable(Integer.parseInt(tokens[n]))));
+      }
+      return edges;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
index 6a795a8..ac7f5b7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
@@ -85,16 +85,11 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
      * Create the line record reader. Override this to use a different
      * underlying record reader (useful for testing).
      *
-     * @param inputSplit
-     *          the split to read
-     * @param context
-     *          the context passed to initialize
-     * @return
-     *         the record reader to be used
-     * @throws IOException
-     *           exception that can be thrown during creation
-     * @throws InterruptedException
-     *           exception that can be thrown during creation
+     * @param inputSplit the split to read
+     * @param context the context passed to initialize
+     * @return the record reader to be used
+     * @throws IOException exception that can be thrown during creation
+     * @throws InterruptedException exception that can be thrown during creation
      */
     protected RecordReader<LongWritable, Text>
     createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
@@ -157,22 +152,17 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
     /**
      * Reads vertex id from the current line.
      *
-     * @param line
-     *          the current line
-     * @return
-     *         the vertex id corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
+     * @param line the current line
+     * @return the vertex id corresponding to the line
+     * @throws IOException exception that can be thrown while reading
      */
     protected abstract I getId(Text line) throws IOException;
 
     /**
      * Reads vertex value from the current line.
      *
-     * @param line
-     *          the current line
-     * @return
-     *         the vertex value corresponding to the line
+     * @param line the current line
+     * @return the vertex value corresponding to the line
      * @throws IOException
      *           exception that can be thrown while reading
      */
@@ -183,8 +173,7 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
    * Abstract class to be implemented by the user to read a vertex value from
    * each text line after preprocessing it.
    *
-   * @param <T>
-   *          The resulting type of preprocessing.
+   * @param <T> The resulting type of preprocessing.
    */
   protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
       extends TextVertexValueReader {
@@ -226,12 +215,9 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
      * Preprocess the line so other methods can easily read necessary
      * information for creating vertex.
      *
-     * @param line
-     *          the current line to be read
-     * @return
-     *         the preprocessed object
-     * @throws IOException
-     *           exception that can be thrown while reading
+     * @param line the current line to be read
+     * @return the preprocessed object
+     * @throws IOException exception that can be thrown while reading
      */
     protected abstract T preprocessLine(Text line) throws IOException;
 
@@ -240,22 +226,17 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
      *
      * @param line
      *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex id
-     * @throws IOException
-     *           exception that can be thrown while reading
+     * @return the vertex id
+     * @throws IOException exception that can be thrown while reading
      */
     protected abstract I getId(T line) throws IOException;
 
     /**
      * Reads vertex value from the preprocessed line.
      *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex value
-     * @throws IOException
-     *           exception that can be thrown while reading
+     * @param line the object obtained by preprocessing the line
+     * @return the vertex value
+     * @throws IOException exception that can be thrown while reading
      */
     protected abstract V getValue(T line) throws IOException;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 5b870c5..fcb5b87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -18,13 +18,14 @@
 
 package org.apache.giraph.job;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.DefaultVertexValueFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -73,6 +74,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   private static final int EDGE_PARAM_OUT_EDGES_INDEX = 1;
   /** V param vertex value factory index in classList */
   private static final int VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX = 0;
+  /** V param vertex value combiner index in classList */
+  private static final int VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX = 0;
 
   /**
    * The Configuration object for use in the validation test.
@@ -138,7 +141,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     verifyEdgeInputFormatGenericTypes();
     verifyVertexOutputFormatGenericTypes();
     verifyVertexResolverGenericTypes();
-    verifyVertexCombinerGenericTypes();
+    verifyVertexValueCombinerGenericTypes();
+    verifyMessageCombinerGenericTypes();
     verifyVertexValueFactoryGenericTypes();
   }
 
@@ -240,17 +244,35 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     }
   }
 
-  /** If there is a combiner type, verify its generic params match the job. */
-  private void verifyVertexCombinerGenericTypes() {
-    Class<? extends Combiner<I, M2>> vertexCombinerClass =
-      conf.getCombinerClass();
-    if (vertexCombinerClass != null) {
+  /**
+   * If there is a vertex value combiner type, verify its
+   * generic params match the job.
+   */
+  private void verifyVertexValueCombinerGenericTypes() {
+    Class<? extends VertexValueCombiner<V>> vertexValueCombiner =
+        conf.getVertexValueCombinerClass();
+    if (vertexValueCombiner != null) {
+      Class<?>[] classList =
+          getTypeArguments(VertexValueCombiner.class, vertexValueCombiner);
+      checkAssignable(classList, VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX,
+          vertexValueType(), VertexValueCombiner.class, "vertex value");
+    }
+  }
+
+  /**
+   * If there is a message combiner type, verify its
+   * generic params match the job.
+   */
+  private void verifyMessageCombinerGenericTypes() {
+    Class<? extends MessageCombiner<I, M2>> messageCombinerClass =
+      conf.getMessageCombinerClass();
+    if (messageCombinerClass != null) {
       Class<?>[] classList =
-          getTypeArguments(Combiner.class, vertexCombinerClass);
-      checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(), Combiner.class,
-          "vertex index");
+          getTypeArguments(MessageCombiner.class, messageCombinerClass);
+      checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
+          MessageCombiner.class, "vertex index");
       checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
-          outgoingMessageValueType(), Combiner.class, "message value");
+          outgoingMessageValueType(), MessageCombiner.class, "message value");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
index 6b2eedf..c7e9eae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
@@ -17,7 +17,7 @@
  */
 package org.apache.giraph.jython;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
@@ -281,8 +281,8 @@ public class JythonJob {
   private final TypeHolder message_value = new TypeHolder();
   /** Computation class */
   private String computation_name;
-  /** Combiner class */
-  private Class<? extends Combiner> combiner;
+  /** MessageCombiner class */
+  private Class<? extends MessageCombiner> messageCombiner;
   /** Java options */
   private final List<String> java_options = Lists.newArrayList();
   /** Giraph options */
@@ -342,12 +342,13 @@ public class JythonJob {
     return giraph_options;
   }
 
-  public Class<? extends Combiner> getCombiner() {
-    return combiner;
+  public Class<? extends MessageCombiner> getMessageCombiner() {
+    return messageCombiner;
   }
 
-  public void setCombiner(Class<? extends Combiner> combiner) {
-    this.combiner = combiner;
+  public void setMessageCombiner(
+      Class<? extends MessageCombiner> messageCombiner) {
+    this.messageCombiner = messageCombiner;
   }
 
   public String getComputation_name() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index cf7356c..287fdb9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -18,9 +18,9 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
@@ -48,7 +48,10 @@ public abstract class MasterCompute
   private MasterAggregatorUsage masterAggregatorUsage;
   /** Graph state */
   private GraphState graphState;
-  /** Computation and Combiner class used, which can be switched by master */
+  /**
+   * Computation and MessageCombiner classes used, which can be
+   * switched by master
+   */
   private SuperstepClasses superstepClasses;
 
   /**
@@ -143,26 +146,27 @@ public abstract class MasterCompute
   }
 
   /**
-   * Set Combiner class to be used
+   * Set MessageCombiner class to be used
    *
-   * @param combinerClass Combiner class
+   * @param combinerClass MessageCombiner class
    */
-  public final void setCombiner(Class<? extends Combiner> combinerClass) {
-    superstepClasses.setCombinerClass(combinerClass);
+  public final void setMessageCombiner(
+      Class<? extends MessageCombiner> combinerClass) {
+    superstepClasses.setMessageCombinerClass(combinerClass);
   }
 
   /**
-   * Get Combiner class to be used
+   * Get MessageCombiner class to be used
    *
-   * @return Combiner class
+   * @return MessageCombiner class
    */
-  public final Class<? extends Combiner> getCombiner() {
+  public final Class<? extends MessageCombiner> getMessageCombiner() {
     // Might be called prior to classes being set, do not return NPE
     if (superstepClasses == null) {
       return null;
     }
 
-    return superstepClasses.getCombinerClass();
+    return superstepClasses.getMessageCombinerClass();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index 7a7df05..8344910 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.graph.Computation;
@@ -35,13 +35,13 @@ import java.lang.reflect.Modifier;
 import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
 
 /**
- * Holds Computation and Combiner class.
+ * Holds Computation and MessageCombiner class.
  */
 public class SuperstepClasses implements Writable {
   /** Computation class to be used in the following superstep */
   private Class<? extends Computation> computationClass;
-  /** Combiner class to be used in the following superstep */
-  private Class<? extends Combiner> combinerClass;
+  /** MessageCombiner class to be used in the following superstep */
+  private Class<? extends MessageCombiner> messageCombinerClass;
 
   /**
    * Default constructor
@@ -56,27 +56,28 @@ public class SuperstepClasses implements Writable {
    */
   @SuppressWarnings("unchecked")
   public SuperstepClasses(ImmutableClassesGiraphConfiguration conf) {
-    this(conf.getComputationClass(), conf.getCombinerClass());
+    this(conf.getComputationClass(), conf.getMessageCombinerClass());
   }
 
   /**
    * Constructor
    *
    * @param computationClass Computation class
-   * @param combinerClass Combiner class
+   * @param messageCombinerClass MessageCombiner class
    */
   public SuperstepClasses(Class<? extends Computation> computationClass,
-      Class<? extends Combiner> combinerClass) {
+      Class<? extends MessageCombiner> messageCombinerClass) {
     this.computationClass = computationClass;
-    this.combinerClass = combinerClass;
+    this.messageCombinerClass =
+        messageCombinerClass;
   }
 
   public Class<? extends Computation> getComputationClass() {
     return computationClass;
   }
 
-  public Class<? extends Combiner> getCombinerClass() {
-    return combinerClass;
+  public Class<? extends MessageCombiner> getMessageCombinerClass() {
+    return messageCombinerClass;
   }
 
   public void setComputationClass(
@@ -84,13 +85,15 @@ public class SuperstepClasses implements Writable {
     this.computationClass = computationClass;
   }
 
-  public void setCombinerClass(Class<? extends Combiner> combinerClass) {
-    this.combinerClass = combinerClass;
+  public void setMessageCombinerClass(
+      Class<? extends MessageCombiner> messageCombinerClass) {
+    this.messageCombinerClass =
+        messageCombinerClass;
   }
 
   /**
-   * Verify that types of current Computation and Combiner are valid. If types
-   * don't match an {@link IllegalStateException} will be thrown.
+   * Verify that types of current Computation and MessageCombiner are valid.
+   * If types don't match an {@link IllegalStateException} will be thrown.
    *
    * @param conf Configuration to verify this with
    * @param checkMatchingMesssageTypes Check that the incoming/outgoing
@@ -128,13 +131,13 @@ public class SuperstepClasses implements Writable {
       throw new IllegalStateException("verifyTypesMatch: " +
           "Message type can't be abstract class" + outgoingMessageType);
     }
-    if (combinerClass != null) {
+    if (messageCombinerClass != null) {
       Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
-          Combiner.class, combinerClass);
+          MessageCombiner.class, messageCombinerClass);
       verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
-          "Vertex id", combinerClass);
+          "Vertex id", messageCombinerClass);
       verifyTypes(outgoingMessageType, combinerTypes[1],
-          "Outgoing message", combinerClass);
+          "Outgoing message", messageCombinerClass);
     }
   }
 
@@ -160,13 +163,13 @@ public class SuperstepClasses implements Writable {
   @Override
   public void write(DataOutput output) throws IOException {
     WritableUtils.writeClass(computationClass, output);
-    WritableUtils.writeClass(combinerClass, output);
+    WritableUtils.writeClass(messageCombinerClass, output);
   }
 
   @Override
   public void readFields(DataInput input) throws IOException {
     computationClass = WritableUtils.readClass(input);
-    combinerClass = WritableUtils.readClass(input);
+    messageCombinerClass = WritableUtils.readClass(input);
   }
 
   @Override
@@ -174,6 +177,7 @@ public class SuperstepClasses implements Writable {
     String computationName = computationClass == null ? "_not_set_" :
         computationClass.getName();
     return "(computation=" + computationName + ",combiner=" +
-        ((combinerClass == null) ? "null" : combinerClass.getName()) + ")";
+        ((messageCombinerClass == null) ? "null" :
+            messageCombinerClass.getName()) + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
index f2b8552..ec8a7d7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
@@ -19,6 +19,8 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.utils.VertexIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
@@ -44,11 +46,14 @@ public abstract class BasicPartition<I extends WritableComparable,
   private int id;
   /** Context used to report progress */
   private Progressable progressable;
+  /** Vertex value combiner */
+  private VertexValueCombiner<V> vertexValueCombiner;
 
   @Override
   public void initialize(int partitionId, Progressable progressable) {
     setId(partitionId);
     setProgressable(progressable);
+    vertexValueCombiner = conf.createVertexValueCombiner();
   }
 
   @Override
@@ -84,6 +89,21 @@ public abstract class BasicPartition<I extends WritableComparable,
     this.progressable = progressable;
   }
 
+  public VertexValueCombiner<V> getVertexValueCombiner() {
+    return vertexValueCombiner;
+  }
+
+  @Override
+  public void addPartitionVertices(VertexIterator<I, V, E> vertexIterator) {
+    while (vertexIterator.hasNext()) {
+      vertexIterator.next();
+      // Release the vertex if it was put, otherwise reuse as an optimization
+      if (putOrCombine(vertexIterator.getVertex())) {
+        vertexIterator.releaseVertex();
+      }
+    }
+  }
+
   @Override
   public void write(DataOutput output) throws IOException {
     output.writeInt(id);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index 6eaa6d7..cef39cd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.partition;
 
+import com.google.common.collect.MapMaker;
+import com.google.common.primitives.Ints;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.WritableUtils;
@@ -24,9 +27,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
-import com.google.common.collect.MapMaker;
-import com.google.common.primitives.Ints;
-
+import javax.annotation.concurrent.NotThreadSafe;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -43,6 +44,7 @@ import java.util.concurrent.ConcurrentMap;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
+@NotThreadSafe
 public class ByteArrayPartition<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends BasicPartition<I, V, E>
@@ -55,6 +57,8 @@ public class ByteArrayPartition<I extends WritableComparable,
   private ConcurrentMap<I, byte[]> vertexMap;
   /** Representative vertex */
   private Vertex<I, V, E> representativeVertex;
+  /** Representative combiner vertex */
+  private Vertex<I, V, E> representativeCombinerVertex;
   /** Use unsafe serialization */
   private boolean useUnsafeSerialization;
 
@@ -73,6 +77,11 @@ public class ByteArrayPartition<I extends WritableComparable,
         getConf().createVertexId(),
         getConf().createVertexValue(),
         getConf().createOutEdges());
+    representativeCombinerVertex = getConf().createVertex();
+    representativeCombinerVertex.initialize(
+        getConf().createVertexId(),
+        getConf().createVertexValue(),
+        getConf().createOutEdges());
     useUnsafeSerialization = getConf().useUnsafeSerialization();
   }
 
@@ -125,8 +134,57 @@ public class ByteArrayPartition<I extends WritableComparable,
         (ByteArrayPartition<I, V, E>) partition;
     for (Map.Entry<I, byte[]> entry :
         byteArrayPartition.vertexMap.entrySet()) {
-      vertexMap.put(entry.getKey(), entry.getValue());
+
+      byte[] oldVertexBytes =
+          vertexMap.putIfAbsent(entry.getKey(), entry.getValue());
+      if (oldVertexBytes == null) {
+        continue;
+      }
+
+      // Note that vertex combining is going to be expensive compared to
+      // SimplePartition since here we have to deserialize the vertices,
+      // combine them, and then reserialize them.  If the vertex doesn't exist,
+      // just add the new vertex as a byte[]
+      synchronized (this) {
+        // Combine the vertex values
+        WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
+            representativeVertex, useUnsafeSerialization, getConf());
+        WritableUtils.reinitializeVertexFromByteArray(entry.getValue(),
+            representativeCombinerVertex, useUnsafeSerialization, getConf());
+        getVertexValueCombiner().combine(representativeVertex.getValue(),
+            representativeCombinerVertex.getValue());
+
+        // Add the edges to the representative vertex
+        for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) {
+          representativeVertex.addEdge(edge);
+        }
+
+        byte[] vertexData = WritableUtils.writeVertexToByteArray(
+            representativeCombinerVertex, useUnsafeSerialization, getConf());
+        vertexMap.put(entry.getKey(), vertexData);
+      }
+    }
+  }
+
+  @Override
+  public synchronized boolean putOrCombine(Vertex<I, V, E> vertex) {
+    // Optimistically try to first put and then combine if this fails
+    byte[] vertexData =
+        WritableUtils.writeVertexToByteArray(
+            vertex, useUnsafeSerialization, getConf());
+    byte[] oldVertexBytes = vertexMap.putIfAbsent(vertex.getId(), vertexData);
+    if (oldVertexBytes == null) {
+      return true;
     }
+
+    WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
+        representativeVertex, useUnsafeSerialization, getConf());
+    getVertexValueCombiner().combine(representativeVertex.getValue(),
+        vertex.getValue());
+    vertexMap.put(vertex.getId(),
+        WritableUtils.writeVertexToByteArray(
+            representativeVertex, useUnsafeSerialization, getConf()));
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 110ce9d..c37efd5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -222,15 +222,28 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public Partition<I, V, E> getPartition(Integer id) {
+  public Partition<I, V, E> getOrCreatePartition(Integer id) {
     try {
-      return pool.submit(new GetPartition(id)).get();
+      wLock.lock();
+      Partition<I, V, E> partition =
+          pool.submit(new GetPartition(id)).get();
+      if (partition == null) {
+        Partition<I, V, E> newPartition =
+            conf.createPartition(id, context);
+        pool.submit(
+            new AddPartition(id, newPartition)).get();
+        return newPartition;
+      } else {
+        return partition;
+      }
     } catch (InterruptedException e) {
       throw new IllegalStateException(
-          "getPartition: cannot retrieve partition " + id, e);
+          "getOrCreatePartition: cannot retrieve partition " + id, e);
     } catch (ExecutionException e) {
       throw new IllegalStateException(
-          "getPartition: cannot retrieve partition " + id, e);
+          "getOrCreatePartition: cannot retrieve partition " + id, e);
+    } finally {
+      wLock.unlock();
     }
   }
 
@@ -263,7 +276,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
 
   @Override
   public Partition<I, V, E> removePartition(Integer id) {
-    Partition<I, V, E> partition = getPartition(id);
+    Partition<I, V, E> partition = getOrCreatePartition(id);
     // we put it back, so the partition can turn INACTIVE and be deleted.
     putPartition(partition);
     deletePartition(id);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
index b6b9551..479abcc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -20,6 +20,7 @@ package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.VertexIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
@@ -70,13 +71,32 @@ public interface Partition<I extends WritableComparable,
   Vertex<I, V, E> removeVertex(I vertexIndex);
 
   /**
-   * Add a partition's vertices
+   * Add a partition's vertices.  If a vertex to be added doesn't exist,
+   * add it.  If the vertex already exists, use the
+   * VertexValueCombiner to combine them.
    *
    * @param partition Partition to add
    */
   void addPartition(Partition<I, V, E> partition);
 
   /**
+   * Put this vertex or combine it
+   *
+   * @param vertex Vertex to put or combine
+   * @return True if the vertex was put (hint to release object)
+   */
+  boolean putOrCombine(Vertex<I, V, E> vertex);
+
+  /**
+   * Add vertices to a partition.  If a vertex to be added doesn't exist,
+   * add it.  If the vertex already exists, use the
+   * VertexValueCombiner to combine them.
+   *
+   * @param vertexIterator Vertices to add
+   */
+  void addPartitionVertices(VertexIterator<I, V, E> vertexIterator);
+
+  /**
    * Get the number of vertices in this partition
    *
    * @return Number of vertices

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index 763397e..fdc20a5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public abstract class PartitionStore<I extends WritableComparable,
     V extends Writable, E extends Writable> {
-
   /**
    * Add a new partition to the store or just the vertices from the partition
    * to the old partition.
@@ -40,17 +39,18 @@ public abstract class PartitionStore<I extends WritableComparable,
   public abstract void addPartition(Partition<I, V, E> partition);
 
   /**
-   * Get a partition. Note: user has to put back it to the store through
-   * {@link #putPartition(Partition)} after use.
+   * Get or create a partition. Note: user has to put back
+   * it to the store through {@link #putPartition(Partition)} after use.
    *
    * @param partitionId Partition id
-   * @return The requested partition
+   * @return The requested partition (never null)
    */
-  public abstract Partition<I, V, E> getPartition(Integer partitionId);
+  public abstract Partition<I, V, E> getOrCreatePartition(Integer partitionId);
 
   /**
    * Put a partition back to the store. Use this method to be put a partition
-   * back after it has been retrieved through {@link #getPartition(Integer)}.
+   * back after it has been retrieved through
+   * {@link #getOrCreatePartition(Integer)}.
    *
    * @param partition Partition
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 0c1b404..1609846 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -18,14 +18,15 @@
 
 package org.apache.giraph.partition;
 
+import com.google.common.collect.Maps;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
-import com.google.common.collect.Maps;
-
+import javax.annotation.concurrent.ThreadSafe;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -43,6 +44,7 @@ import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
  * @param <V> Vertex data
  * @param <E> Edge data
  */
+@ThreadSafe
 @SuppressWarnings("rawtypes")
 public class SimplePartition<I extends WritableComparable,
     V extends Writable, E extends Writable>
@@ -81,9 +83,34 @@ public class SimplePartition<I extends WritableComparable,
   }
 
   @Override
+  public boolean putOrCombine(Vertex<I, V, E> vertex) {
+    Vertex<I, V, E> originalVertex = vertexMap.get(vertex.getId());
+    if (originalVertex == null) {
+      originalVertex =
+          vertexMap.putIfAbsent(vertex.getId(), vertex);
+      if (originalVertex == null) {
+        return true;
+      }
+    }
+
+    synchronized (originalVertex) {
+      // Combine the vertex values
+      getVertexValueCombiner().combine(
+          originalVertex.getValue(), vertex.getValue());
+
+      // Add the edges to the representative vertex
+      for (Edge<I, E> edge : vertex.getEdges()) {
+        originalVertex.addEdge(edge);
+      }
+    }
+
+    return false;
+  }
+
+  @Override
   public void addPartition(Partition<I, V, E> partition) {
     for (Vertex<I, V, E> vertex : partition) {
-      vertexMap.put(vertex.getId(), vertex);
+      putOrCombine(vertex);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index ae17aac..79c18c3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -18,13 +18,12 @@
 
 package org.apache.giraph.partition;
 
+import com.google.common.collect.Maps;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import com.google.common.collect.Maps;
-
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -67,12 +66,22 @@ public class SimplePartitionStore<I extends WritableComparable,
         return;
       }
     }
+    // This is thread-safe
     oldPartition.addPartition(partition);
   }
 
   @Override
-  public Partition<I, V, E> getPartition(Integer partitionId) {
-    return partitions.get(partitionId);
+  public Partition<I, V, E> getOrCreatePartition(Integer partitionId) {
+    Partition<I, V, E> oldPartition = partitions.get(partitionId);
+    if (oldPartition == null) {
+      Partition<I, V, E> newPartition =
+          conf.createPartition(partitionId, context);
+      oldPartition = partitions.putIfAbsent(partitionId, newPartition);
+      if (oldPartition == null) {
+        return newPartition;
+      }
+    }
+    return oldPartition;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 7e2b73b..4958ae3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -55,7 +55,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
    * deserializd right away, so this won't help.
    */
   private void setUseMessageSizeEncoding() {
-    if (!getConf().useCombiner()) {
+    if (!getConf().useMessageCombiner()) {
       useMessageSizeEncoding = getConf().useMessageSizeEncoding();
     } else {
       useMessageSizeEncoding = false;

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 4bc4f4d..e441f03 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.utils;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -25,7 +27,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.giraph.Algorithm;
 import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.GiraphTypes;
@@ -35,6 +37,7 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.Language;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -54,9 +57,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooKeeper;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -108,7 +108,7 @@ public final class ConfigurationUtils {
         "for the vertex output");
     OPTIONS.addOption("esd",  "edgeSubDir", true, "subdirectory to be used " +
         "for the edge output");
-    OPTIONS.addOption("c", "combiner", true, "Combiner class");
+    OPTIONS.addOption("c", "combiner", true, "MessageCombiner class");
     OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
     OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
     OPTIONS.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
@@ -277,8 +277,14 @@ public final class ConfigurationUtils {
       TYPES_HOLDER_CLASS.set(conf, typesHolderClass);
     }
     if (cmd.hasOption("c")) {
-      conf.setCombinerClass(
-          (Class<? extends Combiner>) Class.forName(cmd.getOptionValue("c")));
+      conf.setMessageCombinerClass(
+          (Class<? extends MessageCombiner>)
+              Class.forName(cmd.getOptionValue("c")));
+    }
+    if (cmd.hasOption("vc")) {
+      conf.setVertexValueCombinerClass(
+          (Class<? extends VertexValueCombiner>)
+              Class.forName(cmd.getOptionValue("vc")));
     }
     if (cmd.hasOption("ve")) {
       conf.setOutEdgesClass(

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
new file mode 100644
index 0000000..dced9bd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Iterates over vertices stored in an ExtendedDataOutput such that
+ * the ownership of the vertex id can be transferred to another object.
+ * This optimization cuts down on the number of objects instantiated and
+ * garbage collected
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class VertexIterator<I extends WritableComparable,
+    V extends Writable, E extends Writable> {
+  /** Reader of the serialized edges */
+  private final ExtendedDataInput extendedDataInput;
+  /** Current vertex */
+  private Vertex<I, V, E> vertex;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+
+  /**
+   * Constructor.
+   *
+   * @param extendedDataOutput Extended data output
+   * @param configuration Configuration
+   */
+  public VertexIterator(
+      ExtendedDataOutput extendedDataOutput,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration) {
+    extendedDataInput = configuration.createExtendedDataInput(
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+    this.configuration = configuration;
+    resetEmptyVertex();
+  }
+
+  /**
+   * Reset the empty Vertex to an initial state.
+   */
+  private void resetEmptyVertex() {
+    vertex = configuration.createVertex();
+    I id = configuration.createVertexId();
+    V value = configuration.createVertexValue();
+    OutEdges<I, E> edges = configuration.createOutEdges();
+    vertex.initialize(id, value, edges);
+  }
+
+  /**
+   * Returns true if the iteration has more elements.
+   *
+   * @return True if the iteration has more elements.
+   */
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+
+  /**
+   * Moves to the next element in the iteration.
+   */
+  public void next() {
+    // If the vertex was released, create another one
+    if (vertex == null) {
+      resetEmptyVertex();
+    }
+
+    // If the vertex id was released, create another one
+    if (vertex.getId() == null) {
+      vertex.initialize(configuration.createVertexId(), vertex.getValue());
+    }
+
+    try {
+      WritableUtils.reinitializeVertexFromDataInput(
+          extendedDataInput, vertex, configuration);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: IOException", e);
+    }
+  }
+
+  /**
+   * Get the current vertex id.  Ihis object's contents are only guaranteed
+   * until next() is called.  To take ownership of this object call
+   * releaseCurrentVertexId() after getting a reference to this object.
+   *
+   * @return Current vertex id
+   */
+  public I getCurrentVertexId() {
+    return vertex.getId();
+  }
+
+  /**
+   * The backing store of the current vertex id is now released.
+   * Further calls to getCurrentVertexId () without calling next()
+   * will return null.
+   *
+   * @return Current vertex id that was released
+   */
+  public I releaseCurrentVertexId() {
+    I releasedVertexId = vertex.getId();
+    vertex.initialize(null, vertex.getValue());
+    return releasedVertexId;
+  }
+
+  public Vertex<I, V, E> getVertex() {
+    return vertex;
+  }
+
+  /**
+   * Release the ownership of the Vertex object to the caller
+   *
+   * @return Released Vertex object
+   */
+  public Vertex<I, V, E> releaseVertex() {
+    Vertex<I, V, E> releasedVertex = vertex;
+    vertex = null;
+    return releasedVertex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 9163c08..3f8382e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -530,7 +530,7 @@ public class WritableUtils {
   }
 
   /**
-   * Reads data from input stream to inizialize Vertex.
+   * Reads data from input stream to initialize Vertex.
    *
    * @param input The input stream
    * @param conf Configuration

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 112b76d..a92ddf8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -571,7 +571,7 @@ public class BspServiceWorker<I extends WritableComparable,
         new ArrayList<PartitionStats>();
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
       Partition<I, V, E> partition =
-          getPartitionStore().getPartition(partitionId);
+          getPartitionStore().getOrCreatePartition(partitionId);
       PartitionStats partitionStats =
           new PartitionStats(partition.getId(),
               partition.getVertexCount(),
@@ -974,7 +974,7 @@ public class BspServiceWorker<I extends WritableComparable,
               }
 
               Partition<I, V, E> partition =
-                  getPartitionStore().getPartition(partitionId);
+                  getPartitionStore().getOrCreatePartition(partitionId);
               long verticesWritten = 0;
               for (Vertex<I, V, E> vertex : partition) {
                 vertexWriter.writeVertex(vertex);
@@ -1082,7 +1082,7 @@ public class BspServiceWorker<I extends WritableComparable,
               }
 
               Partition<I, V, E> partition =
-                  getPartitionStore().getPartition(partitionId);
+                  getPartitionStore().getOrCreatePartition(partitionId);
               long vertices = 0;
               long edges = 0;
               long partitionEdgeCount = partition.getEdgeCount();
@@ -1241,7 +1241,7 @@ public class BspServiceWorker<I extends WritableComparable,
     DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
       Partition<I, V, E> partition =
-          getPartitionStore().getPartition(partitionId);
+          getPartitionStore().getOrCreatePartition(partitionId);
       long startPos = verticesOutputStream.getPos();
       partition.write(verticesOutputStream);
       // write messages

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 115c108..fcdfa5c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -129,7 +129,7 @@ public class RequestTest {
     assertTrue(partitionStore.hasPartition(partitionId));
     int total = 0;
     Partition<IntWritable, IntWritable, IntWritable> partition2 =
-        partitionStore.getPartition(partitionId);	
+        partitionStore.getOrCreatePartition(partitionId);
     for (Vertex<IntWritable, IntWritable, IntWritable> vertex : partition2) {
       total += vertex.getId().get();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
index a8f6f70..1fe3a25 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.FloatSumCombiner;
+import org.apache.giraph.combiner.FloatSumMessageCombiner;
 import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
@@ -70,8 +70,8 @@ public class TestIntFloatPrimitiveMessageStores {
         Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
-    Mockito.when(partitionStore.getPartition(0)).thenReturn(partition);
-    Mockito.when(partitionStore.getPartition(1)).thenReturn(partition);
+    Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
+    Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
   }
 
   private static class IntFloatNoOpComputation extends
@@ -122,7 +122,7 @@ public class TestIntFloatPrimitiveMessageStores {
   @Test
   public void testIntFloatMessageStore() throws IOException {
     IntFloatMessageStore messageStore =
-        new IntFloatMessageStore(service, new FloatSumCombiner());
+        new IntFloatMessageStore(service, new FloatSumMessageCombiner());
     insertIntFloatMessages(messageStore);
 
     Iterable<FloatWritable> m0 =

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index 0659260..a04b703 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.DoubleSumCombiner;
+import org.apache.giraph.combiner.DoubleSumMessageCombiner;
 import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
@@ -70,8 +70,8 @@ public class TestLongDoublePrimitiveMessageStores {
         Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
-    Mockito.when(partitionStore.getPartition(0)).thenReturn(partition);
-    Mockito.when(partitionStore.getPartition(1)).thenReturn(partition);
+    Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
+    Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
   }
 
   private static class LongDoubleNoOpComputation extends
@@ -122,7 +122,7 @@ public class TestLongDoublePrimitiveMessageStores {
   @Test
   public void testLongDoubleMessageStore() throws IOException {
     LongDoubleMessageStore messageStore =
-        new LongDoubleMessageStore(service, new DoubleSumCombiner());
+        new LongDoubleMessageStore(service, new DoubleSumMessageCombiner());
     insertLongDoubleMessages(messageStore);
 
     Iterable<DoubleWritable> m0 =


Mime
View raw message