giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject [4/4] git commit: updated refs/heads/trunk to a95066c
Date Wed, 09 Oct 2013 06:35:20 GMT
Everything compiles.
All tests should run.
Next step is to add a test for the vertex combiner.
Should have fixed.
Fixed one bug for byte array partition.
Fixed another bug for too small of a message buffer.
Rebased.
Rebased.
Passes tests.  Need to add two more tests.
1)  Test VertexInputFormat and edge input format.
Done
2) Add a test to check that the vertex value combiner works.
Done
3)  Run an experiment to see if it is faster with my changes.
Passed 'mvn clean verify'
4) Now the edges are added for combined vertices.
5) Clean up.


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a95066cd
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a95066cd
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a95066cd

Branch: refs/heads/trunk
Commit: a95066cd06bdf4ed238e0bef304f28856817d9c9
Parents: e26b51e
Author: Avery Ching <aching@fb.com>
Authored: Fri Jul 19 00:39:13 2013 -0700
Committer: Avery Ching <aching@fb.com>
Committed: Tue Oct 8 22:33:52 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   4 +-
 .../giraph/benchmark/PageRankBenchmark.java     |   4 +-
 .../benchmark/ShortestPathsBenchmark.java       |   4 +-
 .../benchmark/WeightedPageRankBenchmark.java    |  16 +-
 .../org/apache/giraph/combiner/Combiner.java    |  53 ---
 .../giraph/combiner/DoubleSumCombiner.java      |  39 --
 .../combiner/DoubleSumMessageCombiner.java      |  40 ++
 .../giraph/combiner/FloatSumCombiner.java       |  39 --
 .../combiner/FloatSumMessageCombiner.java       |  40 ++
 .../apache/giraph/combiner/MessageCombiner.java |  57 +++
 .../giraph/combiner/MinimumDoubleCombiner.java  |  41 --
 .../combiner/MinimumDoubleMessageCombiner.java  |  42 ++
 .../giraph/combiner/MinimumIntCombiner.java     |  40 --
 .../combiner/MinimumIntMessageCombiner.java     |  40 ++
 .../giraph/combiner/SimpleSumCombiner.java      |  40 --
 .../combiner/SimpleSumMessageCombiner.java      |  40 ++
 .../org/apache/giraph/comm/SendDataCache.java   |  13 +-
 .../apache/giraph/comm/SendPartitionCache.java  | 108 ++----
 .../giraph/comm/SendVertexIdDataCache.java      |  54 ++-
 .../comm/WorkerClientRequestProcessor.java      |   5 +-
 .../messages/InMemoryMessageStoreFactory.java   |  18 +-
 .../comm/messages/MessageStoreFactory.java      |   2 +-
 .../comm/messages/OneMessagePerVertexStore.java |  25 +-
 .../primitives/IntByteArrayMessageStore.java    |   2 +-
 .../primitives/IntFloatMessageStore.java        |  19 +-
 .../primitives/LongByteArrayMessageStore.java   |   2 +-
 .../primitives/LongDoubleMessageStore.java      |  19 +-
 .../giraph/comm/netty/NettyWorkerClient.java    |   2 +-
 .../NettyWorkerClientRequestProcessor.java      |  79 ++--
 .../giraph/comm/netty/NettyWorkerServer.java    |   4 +-
 .../comm/netty/handler/RequestEncoder.java      |   2 +-
 .../giraph/comm/requests/RequestType.java       |   2 +
 .../giraph/comm/requests/SendVertexRequest.java |   3 +-
 .../requests/SendWorkerVerticesRequest.java     | 129 +++++++
 .../org/apache/giraph/conf/GiraphClasses.java   |  56 ++-
 .../apache/giraph/conf/GiraphConfiguration.java |  35 +-
 .../org/apache/giraph/conf/GiraphConstants.java |  45 ++-
 .../ImmutableClassesGiraphConfiguration.java    |  48 ++-
 .../java/org/apache/giraph/edge/EdgeStore.java  |  12 +-
 .../apache/giraph/graph/ComputeCallable.java    |   2 +-
 .../graph/DefaultVertexValueCombiner.java       |  40 ++
 .../giraph/graph/VertexValueCombiner.java       |  39 ++
 .../IntIntNullTextVertexInputFormat.java        |  94 +++++
 .../io/formats/TextVertexValueInputFormat.java  |  57 +--
 .../job/GiraphConfigurationValidator.java       |  44 ++-
 .../org/apache/giraph/jython/JythonJob.java     |  15 +-
 .../org/apache/giraph/master/MasterCompute.java |  26 +-
 .../apache/giraph/master/SuperstepClasses.java  |  46 ++-
 .../apache/giraph/partition/BasicPartition.java |  20 +
 .../giraph/partition/ByteArrayPartition.java    |  66 +++-
 .../partition/DiskBackedPartitionStore.java     |  23 +-
 .../org/apache/giraph/partition/Partition.java  |  22 +-
 .../apache/giraph/partition/PartitionStore.java |  12 +-
 .../giraph/partition/SimplePartition.java       |  33 +-
 .../giraph/partition/SimplePartitionStore.java  |  17 +-
 .../giraph/utils/ByteArrayVertexIdMessages.java |   2 +-
 .../apache/giraph/utils/ConfigurationUtils.java |  20 +-
 .../org/apache/giraph/utils/VertexIterator.java | 143 +++++++
 .../org/apache/giraph/utils/WritableUtils.java  |   2 +-
 .../apache/giraph/worker/BspServiceWorker.java  |   8 +-
 .../org/apache/giraph/comm/RequestTest.java     |   2 +-
 .../TestIntFloatPrimitiveMessageStores.java     |   8 +-
 .../TestLongDoublePrimitiveMessageStores.java   |   8 +-
 .../org/apache/giraph/io/TestEdgeInput.java     | 261 -------------
 .../apache/giraph/io/TestVertexEdgeInput.java   | 385 +++++++++++++++++++
 .../master/TestComputationCombinerTypes.java    |  27 +-
 .../apache/giraph/master/TestSwitchClasses.java |  81 ++--
 .../giraph/partition/TestPartitionStores.java   |  14 +-
 .../java/org/apache/giraph/TestBspBasic.java    |   7 +-
 .../ConnectedComponentsComputationTest.java     |   4 +-
 ...nectedComponentsComputationTestInMemory.java |   4 +-
 .../giraph/examples/MinimumIntCombinerTest.java |  23 +-
 .../examples/TryMultiIpcBindingPortsTest.java   |   4 +-
 .../giraph/vertex/TestComputationTypes.java     |  20 +-
 .../vertex/examples/HiveIntIntNullVertex.java   |  58 +++
 .../giraph/hive/jython/HiveJythonUtils.java     |   6 +-
 .../giraph/hive/input/HiveVertexInputTest.java  |  11 +-
 .../giraph/jython/count-edges-launcher.py       |   2 +-
 src/site/xdoc/quick_start.xml                   |   2 +-
 79 files changed, 1910 insertions(+), 971 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index f960d09..8f046a5 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -710,7 +710,7 @@ Release 1.0.0 - 2013-04-15
   GIRAPH-298: Reduce timeout for TestAutoCheckpoint. (majakabiljo via
   aching)
 	
-  GIRAPH-324: Add option to use combiner in benchmarks. (apresta via
+  GIRAPH-324: Add option to use messageCombiner in benchmarks. (apresta via
   aching)
 
   GIRAPH-191: Random walks on graphs (Gianmarco De Francisci Morales
@@ -843,7 +843,7 @@ Release 1.0.0 - 2013-04-15
   GIRAPH-236: Add FindBugs to maven build (Jan van der Lugt via
   aching).
 
-  GIRAPH-224: Netty server-side combiner (apresta via aching).
+  GIRAPH-224: Netty server-side messageCombiner (apresta via aching).
 
   GIRAPH-251: Allow to access the distributed cache from Vertexes and
   WorkerContext (Gianmarco De Francisci Morales via aching).

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index acc1c46..8fd529d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.giraph.combiner.FloatSumCombiner;
+import org.apache.giraph.combiner.FloatSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.edge.IntNullArrayEdges;
@@ -76,7 +76,7 @@ public class PageRankBenchmark extends GiraphBenchmark {
       conf.setComputationClass(PageRankComputation.class);
     }
     conf.setOutEdgesClass(IntNullArrayEdges.class);
-    conf.setCombinerClass(FloatSumCombiner.class);
+    conf.setMessageCombinerClass(FloatSumMessageCombiner.class);
     conf.setVertexInputFormatClass(
         PseudoRandomIntNullVertexInputFormat.class);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 0dd4529..33fc7f2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.giraph.combiner.MinimumDoubleCombiner;
+import org.apache.giraph.combiner.MinimumDoubleMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.edge.ArrayListEdges;
@@ -67,7 +67,7 @@ public class ShortestPathsBenchmark extends GiraphBenchmark {
     LOG.info("Using class " + GiraphConstants.COMPUTATION_CLASS.get(conf));
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     if (!NO_COMBINER.optionTurnedOn(cmd)) {
-      conf.setCombinerClass(MinimumDoubleCombiner.class);
+      conf.setMessageCombinerClass(MinimumDoubleMessageCombiner.class);
     }
     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
         BenchmarkOption.VERTICES.getOptionLongValue(cmd));

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
index 2077674..8a796ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
@@ -18,9 +18,9 @@
 package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.giraph.combiner.DoubleSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.combiner.DoubleSumCombiner;
 import org.apache.giraph.edge.ArrayListEdges;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.HashMapEdges;
@@ -63,9 +63,10 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
       "Partitioning algorithm (0 for hash partitioning (default), " +
           "1 for range partitioning)");
   /** Option for type of combiner */
-  private static final BenchmarkOption COMBINER_TYPE = new BenchmarkOption(
-      "t", "combinerType", true,
-      "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)");
+  private static final BenchmarkOption MESSAGE_COMBINER_TYPE =
+      new BenchmarkOption("t", "combinerType", true,
+          "MessageCombiner type (0 for no combiner," +
+              " 1 for DoubleSumMessageCombiner (default)");
   /** Option for output format */
   private static final BenchmarkOption OUTPUT_FORMAT = new BenchmarkOption(
       "o", "vertexOutputFormat", true,
@@ -76,7 +77,8 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
     return Sets.newHashSet(
         BenchmarkOption.SUPERSTEPS, BenchmarkOption.VERTICES,
         BenchmarkOption.EDGES_PER_VERTEX, BenchmarkOption.LOCAL_EDGES_MIN_RATIO,
-        EDGES_CLASS, EDGE_INPUT, PARTITIONER, COMBINER_TYPE, OUTPUT_FORMAT);
+        EDGES_CLASS, EDGE_INPUT, PARTITIONER,
+        MESSAGE_COMBINER_TYPE, OUTPUT_FORMAT);
   }
 
   /**
@@ -115,8 +117,8 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
 
     LOG.info("Using edges class " +
         GiraphConstants.VERTEX_EDGES_CLASS.get(configuration));
-    if (COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) {
-      configuration.setCombinerClass(DoubleSumCombiner.class);
+    if (MESSAGE_COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) {
+      configuration.setMessageCombinerClass(DoubleSumMessageCombiner.class);
     }
 
     if (EDGE_INPUT.optionTurnedOn(cmd)) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/Combiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/Combiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/Combiner.java
deleted file mode 100644
index 7830fff..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/Combiner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Abstract class to extend for combining messages sent to the same vertex.
- * Combiner for applications where each two messages for one vertex can be
- * combined into one.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public abstract class Combiner<I extends WritableComparable,
-    M extends Writable> {
-  /**
-   * Combine messageToCombine with originalMessage,
-   * by modifying originalMessage.
-   *
-   * @param vertexIndex Index of the vertex getting these messages
-   * @param originalMessage The first message which we want to combine;
-   *                        put the result of combining in this message
-   * @param messageToCombine The second message which we want to combine
-   */
-  public abstract void combine(I vertexIndex, M originalMessage,
-      M messageToCombine);
-
-  /**
-   * Get the initial message. When combined with any other message M,
-   * the result should be M.
-   *
-   * @return Initial message
-   */
-  public abstract M createInitialMessage();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumCombiner.java
deleted file mode 100644
index 8da4ba7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumCombiner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * A combiner that sums double-valued messages
- */
-public class DoubleSumCombiner extends
-    Combiner<LongWritable, DoubleWritable> {
-  @Override
-  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
-      DoubleWritable messageToCombine) {
-    originalMessage.set(originalMessage.get() + messageToCombine.get());
-  }
-
-  @Override
-  public DoubleWritable createInitialMessage() {
-    return new DoubleWritable(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
new file mode 100644
index 0000000..163e0d8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A combiner that sums double-valued messages
+ */
+public class DoubleSumMessageCombiner
+    extends
+    MessageCombiner<LongWritable, DoubleWritable> {
+  @Override
+  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
+      DoubleWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
+
+  @Override
+  public DoubleWritable createInitialMessage() {
+    return new DoubleWritable(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
deleted file mode 100644
index d898791..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-
-/**
- * A combiner that sums float-valued messages
- */
-public class FloatSumCombiner extends
-    Combiner<IntWritable, FloatWritable> {
-  @Override
-  public void combine(IntWritable vertexIndex, FloatWritable originalMessage,
-      FloatWritable messageToCombine) {
-    originalMessage.set(originalMessage.get() + messageToCombine.get());
-  }
-
-  @Override
-  public FloatWritable createInitialMessage() {
-    return new FloatWritable(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
new file mode 100644
index 0000000..b13a7f7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * A combiner that sums float-valued messages
+ */
+public class FloatSumMessageCombiner
+    extends
+    MessageCombiner<IntWritable, FloatWritable> {
+  @Override
+  public void combine(IntWritable vertexIndex, FloatWritable originalMessage,
+      FloatWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
+
+  @Override
+  public FloatWritable createInitialMessage() {
+    return new FloatWritable(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
new file mode 100644
index 0000000..e53ab3f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Abstract class to extend for combining messages sent to the same vertex.
+ * MessageCombiner for applications where each two messages for one
+ * vertex can be combined into one.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public abstract class MessageCombiner<I extends WritableComparable,
+    M extends Writable> {
+  /**
+   * Combine messageToCombine with originalMessage, by modifying
+   * originalMessage.  Note that the messageToCombine object
+   * may be reused by the infrastructure, therefore, you cannot directly
+   * use it or any objects from it in original message
+   *
+   * @param vertexIndex Index of the vertex getting these messages
+   * @param originalMessage The first message which we want to combine;
+   *                        put the result of combining in this message
+   * @param messageToCombine The second message which we want to combine
+   *                         (object may be reused - do not reference it or its
+   *                         member objects)
+   */
+  public abstract void combine(I vertexIndex, M originalMessage,
+      M messageToCombine);
+
+  /**
+   * Get the initial message. When combined with any other message M,
+   * the result should be M.
+   *
+   * @return Initial message
+   */
+  public abstract M createInitialMessage();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleCombiner.java
deleted file mode 100644
index 0a85d64..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleCombiner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Combiner which finds the minimum of {@link DoubleWritable}.
- */
-public class MinimumDoubleCombiner extends
-    Combiner<LongWritable, DoubleWritable> {
-  @Override
-  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
-      DoubleWritable messageToCombine) {
-    if (originalMessage.get() > messageToCombine.get()) {
-      originalMessage.set(messageToCombine.get());
-    }
-  }
-
-  @Override
-  public DoubleWritable createInitialMessage() {
-    return new DoubleWritable(Double.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
new file mode 100644
index 0000000..a1f4bd7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MessageCombiner which finds the minimum of {@link DoubleWritable}.
+ */
+public class MinimumDoubleMessageCombiner
+    extends
+    MessageCombiner<LongWritable, DoubleWritable> {
+  @Override
+  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
+      DoubleWritable messageToCombine) {
+    if (originalMessage.get() > messageToCombine.get()) {
+      originalMessage.set(messageToCombine.get());
+    }
+  }
+
+  @Override
+  public DoubleWritable createInitialMessage() {
+    return new DoubleWritable(Double.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntCombiner.java
deleted file mode 100644
index fcef58e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntCombiner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.IntWritable;
-
-/**
- * {@link Combiner} that finds the minimum {@link IntWritable}
- */
-public class MinimumIntCombiner
-    extends Combiner<IntWritable, IntWritable> {
-  @Override
-  public void combine(IntWritable vertexIndex, IntWritable originalMessage,
-      IntWritable messageToCombine) {
-    if (originalMessage.get() > messageToCombine.get()) {
-      originalMessage.set(messageToCombine.get());
-    }
-  }
-
-  @Override
-  public IntWritable createInitialMessage() {
-    return new IntWritable(Integer.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
new file mode 100644
index 0000000..227c6e6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * {@link MessageCombiner} that finds the minimum {@link IntWritable}
+ */
+public class MinimumIntMessageCombiner
+    extends MessageCombiner<IntWritable, IntWritable> {
+  @Override
+  public void combine(IntWritable vertexIndex, IntWritable originalMessage,
+      IntWritable messageToCombine) {
+    if (originalMessage.get() > messageToCombine.get()) {
+      originalMessage.set(messageToCombine.get());
+    }
+  }
+
+  @Override
+  public IntWritable createInitialMessage() {
+    return new IntWritable(Integer.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumCombiner.java
deleted file mode 100644
index 2a11d2f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumCombiner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Combiner which sums up {@link IntWritable} message values.
- */
-public class SimpleSumCombiner
-    extends Combiner<LongWritable, IntWritable> {
-
-  @Override
-  public void combine(LongWritable vertexIndex, IntWritable originalMessage,
-      IntWritable messageToCombine) {
-    originalMessage.set(originalMessage.get() + messageToCombine.get());
-  }
-
-  @Override
-  public IntWritable createInitialMessage() {
-    return new IntWritable(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
new file mode 100644
index 0000000..1b4f5ef
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MessageCombiner which sums up {@link IntWritable} message values.
+ */
+public class SimpleSumMessageCombiner
+    extends MessageCombiner<LongWritable, IntWritable> {
+
+  @Override
+  public void combine(LongWritable vertexIndex, IntWritable originalMessage,
+      IntWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
+
+  @Override
+  public IntWritable createInitialMessage() {
+    return new IntWritable(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
index 6973785..4eb57fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
@@ -23,11 +23,8 @@ import com.google.common.collect.Maps;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayVertexIdData;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 import javax.annotation.concurrent.NotThreadSafe;
 import java.util.List;
@@ -37,13 +34,11 @@ import java.util.Map;
  * An abstract structure for caching data by partitions
  * to be sent to workers in bulk. Not thread-safe.
  *
- * @param <I> Vertex id
- * @param <D> Data type
+ * @param <D> Data type of partition cache
  */
 @NotThreadSafe
 @SuppressWarnings("unchecked")
-public abstract class SendDataCache<I extends WritableComparable,
-    D extends Writable> {
+public abstract class SendDataCache<D> {
   /**
    * Internal cache of partitions (index) to their partition caches of
    * type D.
@@ -51,8 +46,6 @@ public abstract class SendDataCache<I extends WritableComparable,
   private final D[] dataCache;
   /** How big to initially make output streams for each worker's partitions */
   private final int[] initialBufferSizes;
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
   /** Service worker */
   private final CentralizedServiceWorker serviceWorker;
   /** Size of data (in bytes) for each worker */
@@ -92,7 +85,7 @@ public abstract class SendDataCache<I extends WritableComparable,
       workerPartitionIds.add(partitionOwner.getPartitionId());
       maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
     }
-    dataCache = (D[]) new Writable[maxPartition + 1];
+    dataCache = (D[]) new Object[maxPartition + 1];
 
     int maxWorker = 0;
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
index 524c9f1..8ec3164 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
@@ -17,19 +17,20 @@
  */
 package org.apache.giraph.comm;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GiraphTransferRegulator;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Maps;
+import java.io.IOException;
 
-import java.util.Map;
+import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_VERTEX_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_VERTEX_REQUEST_SIZE;
 
 /**
  * Caches partition vertices prior to sending.  Aggregating these requests
@@ -40,93 +41,54 @@ import java.util.Map;
  * @param <E> Edge value
  */
 public class SendPartitionCache<I extends WritableComparable,
-    V extends Writable, E extends Writable> {
+    V extends Writable, E extends Writable> extends
+    SendDataCache<ExtendedDataOutput> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendPartitionCache.class);
-  /** Input split vertex cache (only used when loading from input split) */
-  private final Map<PartitionOwner, Partition<I, V, E>>
-  ownerPartitionMap = Maps.newHashMap();
-  /** Context */
-  private final Mapper<?, ?, ?, ?>.Context context;
-  /** Configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
-  /**
-   *  Regulates the size of outgoing Collections of vertices read
-   * by the local worker during INPUT_SUPERSTEP that are to be
-   * transfered from <code>inputSplitCache</code> to the owner
-   * of their initial, master-assigned Partition.*
-   */
-  private final GiraphTransferRegulator transferRegulator;
 
   /**
    * Constructor.
    *
-   * @param context Context
-   * @param configuration Configuration
+   * @param conf Giraph configuration
+   * @param serviceWorker Service worker
    */
-  public SendPartitionCache(
-      Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration<I, V, E> configuration) {
-    this.context = context;
-    this.configuration = configuration;
-    transferRegulator =
-        new GiraphTransferRegulator(configuration);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("SendPartitionCache: maxVerticesPerTransfer = " +
-          transferRegulator.getMaxVerticesPerTransfer());
-      LOG.info("SendPartitionCache: maxEdgesPerTransfer = " +
-          transferRegulator.getMaxEdgesPerTransfer());
-    }
+  public SendPartitionCache(ImmutableClassesGiraphConfiguration<I, V, E> conf,
+                            CentralizedServiceWorker<?, ?, ?> serviceWorker) {
+    super(conf, serviceWorker, MAX_VERTEX_REQUEST_SIZE.get(conf),
+        ADDITIONAL_VERTEX_REQUEST_SIZE.get(conf));
   }
 
   /**
-   * Add a vertex to the cache, returning the partition if full
+   * Add a vertex to the cache.
    *
    * @param partitionOwner Partition owner of the vertex
    * @param vertex Vertex to add
-   * @return A partition to send or null, if requirements are not met
+   * @return Size of partitions for this worker
    */
-  public Partition<I, V, E> addVertex(PartitionOwner partitionOwner,
+  public int addVertex(PartitionOwner partitionOwner,
       Vertex<I, V, E> vertex) {
-    Partition<I, V, E> partition =
-        ownerPartitionMap.get(partitionOwner);
-    if (partition == null) {
-      partition = configuration.createPartition(
-          partitionOwner.getPartitionId(),
-          context);
-      ownerPartitionMap.put(partitionOwner, partition);
-    }
-    transferRegulator.incrementCounters(partitionOwner, vertex);
-
-    Vertex<I, V, E> oldVertex = partition.putVertex(vertex);
-    if (oldVertex != null) {
-      LOG.warn("addVertex: Replacing vertex " + oldVertex +
-          " with " + vertex);
+    // Get the data collection
+    ExtendedDataOutput partitionData =
+        getData(partitionOwner.getPartitionId());
+    int taskId = partitionOwner.getWorkerInfo().getTaskId();
+    int originalSize = 0;
+    if (partitionData == null) {
+      partitionData = getConf().createExtendedDataOutput(
+          getInitialBufferSize(taskId));
+      setData(partitionOwner.getPartitionId(), partitionData);
+    } else {
+      originalSize = partitionData.getPos();
     }
-
-    // Requirements met to transfer?
-    if (transferRegulator.transferThisPartition(partitionOwner)) {
-      return ownerPartitionMap.remove(partitionOwner);
+    try {
+      WritableUtils.<I, V, E>writeVertexToDataOutput(
+          partitionData, vertex, getConf());
+    } catch (IOException e) {
+      throw new IllegalStateException("addVertex: Failed to serialize", e);
     }
 
-    return null;
-  }
-
-  /**
-   * Get the owner partition map (for flushing)
-   *
-   * @return Owner partition map
-   */
-  public Map<PartitionOwner, Partition<I, V, E>> getOwnerPartitionMap() {
-    return ownerPartitionMap;
-  }
-
-  /**
-   * Clear the cache.
-   */
-  public void clear() {
-    ownerPartitionMap.clear();
+    // Update the size of cached, outgoing data per worker
+    return incrDataSize(taskId, partitionData.getPos() - originalSize);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
index 2623812..afce3ba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
@@ -37,7 +37,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 @NotThreadSafe
 @SuppressWarnings("unchecked")
 public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
-    B extends ByteArrayVertexIdData<I, T>> extends SendDataCache<I, B> {
+    B extends ByteArrayVertexIdData<I, T>> extends SendDataCache<B> {
   /**
    * Constructor.
    *
@@ -73,20 +73,58 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
   public int addData(WorkerInfo workerInfo,
                      int partitionId, I destVertexId, T data) {
     // Get the data collection
+    ByteArrayVertexIdData<I, T> partitionData =
+        getPartitionData(workerInfo, partitionId);
+    int originalSize = partitionData.getSize();
+    partitionData.add(destVertexId, data);
+    // Update the size of cached, outgoing data per worker
+    return incrDataSize(workerInfo.getTaskId(),
+        partitionData.getSize() - originalSize);
+  }
+
+  /**
+   * This method is similar to the method above,
+   * but use a serialized id to replace original I type
+   * destVertexId.
+   *
+   * @param workerInfo The remote worker destination
+   * @param partitionId The remote Partition this message belongs to
+   * @param serializedId The byte array to store the serialized target vertex id
+   * @param idPos The length of bytes of serialized id in the byte array above
+   * @param data Data to send to remote worker
+   * @return The number of bytes added to the target worker
+   */
+  public int addData(WorkerInfo workerInfo, int partitionId,
+                     byte[] serializedId, int idPos, T data) {
+    // Get the data collection
+    ByteArrayVertexIdData<I, T> partitionData =
+        getPartitionData(workerInfo, partitionId);
+    int originalSize = partitionData.getSize();
+    partitionData.add(serializedId, idPos, data);
+    // Update the size of cached, outgoing data per worker
+    return incrDataSize(workerInfo.getTaskId(),
+        partitionData.getSize() - originalSize);
+  }
+
+  /**
+   * This method tries to get a partition data from the data cache.
+   * If null, it will create one.
+   *
+   * @param workerInfo The remote worker destination
+   * @param partitionId The remote Partition this message belongs to
+   * @return The partition data in data cache
+   */
+  private ByteArrayVertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
+                                                       int partitionId) {
+    // Get the data collection
     B partitionData = getData(partitionId);
-    int originalSize = 0;
     if (partitionData == null) {
       partitionData = createByteArrayVertexIdData();
       partitionData.setConf(getConf());
       partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId()));
       setData(partitionId, partitionData);
-    } else {
-      originalSize = partitionData.getSize();
     }
-    partitionData.add(destVertexId, data);
 
-    // Update the size of cached, outgoing data per worker
-    return incrDataSize(workerInfo.getTaskId(),
-        partitionData.getSize() - originalSize);
+    return partitionData;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
index 9bdf9ca..f788051 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
@@ -66,9 +66,10 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
    *
    * @param partitionOwner Owner of the vertex
    * @param vertex Vertex to send
+   * @return Returns true iff any network I/O occurred.
    */
-  void sendVertexRequest(PartitionOwner partitionOwner,
-                         Vertex<I, V, E> vertex);
+  boolean sendVertexRequest(PartitionOwner partitionOwner,
+                            Vertex<I, V, E> vertex);
 
   /**
    * Send a partition request (no batching).

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 0cdfb73..5f0e929 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
@@ -70,23 +70,23 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
       MessageValueFactory<M> messageValueFactory) {
     Class<M> messageClass = messageValueFactory.getValueClass();
     MessageStore messageStore;
-    if (conf.useCombiner()) {
+    if (conf.useMessageCombiner()) {
       Class<I> vertexIdClass = conf.getVertexIdClass();
       if (vertexIdClass.equals(IntWritable.class) &&
           messageClass.equals(FloatWritable.class)) {
         messageStore = new IntFloatMessageStore(
             (CentralizedServiceWorker<IntWritable, ?, ?>) service,
-            (Combiner<IntWritable, FloatWritable>)
-                conf.<FloatWritable>createCombiner());
+            (MessageCombiner<IntWritable, FloatWritable>)
+                conf.<FloatWritable>createMessageCombiner());
       } else if (vertexIdClass.equals(LongWritable.class) &&
           messageClass.equals(DoubleWritable.class)) {
         messageStore = new LongDoubleMessageStore(
             (CentralizedServiceWorker<LongWritable, ?, ?>) service,
-            (Combiner<LongWritable, DoubleWritable>)
-                conf.<DoubleWritable>createCombiner());
+            (MessageCombiner<LongWritable, DoubleWritable>)
+                conf.<DoubleWritable>createMessageCombiner());
       } else {
         messageStore = new OneMessagePerVertexStore<I, M>(messageValueFactory,
-          service, conf.<M>createCombiner(), conf);
+          service, conf.<M>createMessageCombiner(), conf);
       }
     } else {
       Class<I> vertexIdClass = conf.getVertexIdClass();
@@ -108,8 +108,8 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
       LOG.info("newStore: Created " + messageStore.getClass() +
           " for vertex id " + conf.getVertexIdClass() +
           " and message value " + messageClass + " and" +
-          (conf.useCombiner() ? " combiner " + conf.getCombinerClass() :
-              " no combiner"));
+          (conf.useMessageCombiner() ? " message combiner " +
+              conf.getMessageCombinerClass() : " no combiner"));
     }
     return (MessageStore<I, M>) messageStore;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 254afd4..f582ea2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -34,7 +34,7 @@ public interface MessageStoreFactory<I extends WritableComparable,
   /**
    * Creates new message store.
    *
-   * Note: Combiner class in Configuration can be changed,
+   * Note: MessageCombiner class in Configuration can be changed,
    * this method should return MessageStore which uses current combiner
    *
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 4f150da..acf68ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -35,29 +35,30 @@ import java.util.concurrent.ConcurrentMap;
 /**
  * Implementation of {@link SimpleMessageStore} where we have a single
  * message per vertex.
- * Used when {@link Combiner} is provided.
+ * Used when {@link org.apache.giraph.combiner.MessageCombiner} is provided.
  *
  * @param <I> Vertex id
  * @param <M> Message data
  */
 public class OneMessagePerVertexStore<I extends WritableComparable,
     M extends Writable> extends SimpleMessageStore<I, M, M> {
-  /** Combiner for messages */
-  private final Combiner<I, M> combiner;
+  /** MessageCombiner for messages */
+  private final MessageCombiner<I, M> messageCombiner;
 
   /**
    * @param messageValueFactory Message class held in the store
-   * @param service  Service worker
-   * @param combiner Combiner for messages
-   * @param config   Hadoop configuration
+   * @param service Service worker
+   * @param messageCombiner MessageCombiner for messages
+   * @param config Hadoop configuration
    */
   OneMessagePerVertexStore(
       MessageValueFactory<M> messageValueFactory,
       CentralizedServiceWorker<I, ?, ?> service,
-      Combiner<I, M> combiner,
+      MessageCombiner<I, M> messageCombiner,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     super(messageValueFactory, service, config);
-    this.combiner = combiner;
+    this.messageCombiner =
+        messageCombiner;
   }
 
   @Override
@@ -76,7 +77,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
       M currentMessage =
           partitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
       if (currentMessage == null) {
-        M newMessage = combiner.createInitialMessage();
+        M newMessage = messageCombiner.createInitialMessage();
         currentMessage = partitionMap.putIfAbsent(
             vertexIdMessageIterator.releaseCurrentVertexId(), newMessage);
         if (currentMessage == null) {
@@ -84,7 +85,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
         }
       }
       synchronized (currentMessage) {
-        combiner.combine(vertexId, currentMessage,
+        messageCombiner.combine(vertexId, currentMessage,
             vertexIdMessageIterator.getCurrentMessage());
       }
     }
@@ -157,7 +158,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     public MessageStore<I, M> newStore(
         MessageValueFactory<M> messageValueFactory) {
       return new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
-          config.<M>createCombiner(), config);
+          config.<M>createMessageCombiner(), config);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index cdab2e0..c58868a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -81,7 +81,7 @@ public class IntByteArrayMessageStore<M extends Writable>
         new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<IntWritable, ?, ?> partition =
-          service.getPartitionStore().getPartition(partitionId);
+          service.getPartitionStore().getOrCreatePartition(partitionId);
       Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
           new Int2ObjectOpenHashMap<DataInputOutput>(
               (int) partition.getVertexCount());

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index a193fb9..d75c758 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages.primitives;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -43,7 +43,7 @@ import java.util.List;
 
 /**
  * Special message store to be used when ids are IntWritable and messages
- * are FloatWritable and combiner is used.
+ * are FloatWritable and messageCombiner is used.
  * Uses fastutil primitive maps in order to decrease number of objects and
  * get better performance.
  */
@@ -51,8 +51,8 @@ public class IntFloatMessageStore
     implements MessageStore<IntWritable, FloatWritable> {
   /** Map from partition id to map from vertex id to message */
   private final Int2ObjectOpenHashMap<Int2FloatOpenHashMap> map;
-  /** Message combiner */
-  private final Combiner<IntWritable, FloatWritable> combiner;
+  /** Message messageCombiner */
+  private final MessageCombiner<IntWritable, FloatWritable> messageCombiner;
   /** Service worker */
   private final CentralizedServiceWorker<IntWritable, ?, ?> service;
 
@@ -60,18 +60,19 @@ public class IntFloatMessageStore
    * Constructor
    *
    * @param service Service worker
-   * @param combiner Message combiner
+   * @param messageCombiner Message messageCombiner
    */
   public IntFloatMessageStore(
       CentralizedServiceWorker<IntWritable, ?, ?> service,
-      Combiner<IntWritable, FloatWritable> combiner) {
+      MessageCombiner<IntWritable, FloatWritable> messageCombiner) {
     this.service = service;
-    this.combiner = combiner;
+    this.messageCombiner =
+        messageCombiner;
 
     map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<IntWritable, ?, ?> partition =
-          service.getPartitionStore().getPartition(partitionId);
+          service.getPartitionStore().getOrCreatePartition(partitionId);
       Int2FloatOpenHashMap partitionMap =
           new Int2FloatOpenHashMap((int) partition.getVertexCount());
       map.put(partitionId, partitionMap);
@@ -109,7 +110,7 @@ public class IntFloatMessageStore
           reusableVertexId.set(vertexId);
           reusableMessage.set(message);
           reusableCurrentMessage.set(partitionMap.get(vertexId));
-          combiner.combine(reusableVertexId, reusableCurrentMessage,
+          messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
               reusableMessage);
           message = reusableCurrentMessage.get();
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
index 3272ced..b0a613b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
@@ -82,7 +82,7 @@ public class LongByteArrayMessageStore<M extends Writable>
         new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<LongWritable, ?, ?> partition =
-          service.getPartitionStore().getPartition(partitionId);
+          service.getPartitionStore().getOrCreatePartition(partitionId);
       Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
           new Long2ObjectOpenHashMap<DataInputOutput>(
               (int) partition.getVertexCount());

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 96ed5b4..b1f32d4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages.primitives;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -43,7 +43,7 @@ import java.util.List;
 
 /**
  * Special message store to be used when ids are LongWritable and messages
- * are DoubleWritable and combiner is used.
+ * are DoubleWritable and messageCombiner is used.
  * Uses fastutil primitive maps in order to decrease number of objects and
  * get better performance.
  */
@@ -51,8 +51,8 @@ public class LongDoubleMessageStore
     implements MessageStore<LongWritable, DoubleWritable> {
   /** Map from partition id to map from vertex id to message */
   private final Int2ObjectOpenHashMap<Long2DoubleOpenHashMap> map;
-  /** Message combiner */
-  private final Combiner<LongWritable, DoubleWritable> combiner;
+  /** Message messageCombiner */
+  private final MessageCombiner<LongWritable, DoubleWritable> messageCombiner;
   /** Service worker */
   private final CentralizedServiceWorker<LongWritable, ?, ?> service;
 
@@ -60,18 +60,19 @@ public class LongDoubleMessageStore
    * Constructor
    *
    * @param service Service worker
-   * @param combiner Message combiner
+   * @param messageCombiner Message messageCombiner
    */
   public LongDoubleMessageStore(
       CentralizedServiceWorker<LongWritable, ?, ?> service,
-      Combiner<LongWritable, DoubleWritable> combiner) {
+      MessageCombiner<LongWritable, DoubleWritable> messageCombiner) {
     this.service = service;
-    this.combiner = combiner;
+    this.messageCombiner =
+        messageCombiner;
 
     map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<LongWritable, ?, ?> partition =
-          service.getPartitionStore().getPartition(partitionId);
+          service.getPartitionStore().getOrCreatePartition(partitionId);
       Long2DoubleOpenHashMap partitionMap =
           new Long2DoubleOpenHashMap((int) partition.getVertexCount());
       map.put(partitionId, partitionMap);
@@ -109,7 +110,7 @@ public class LongDoubleMessageStore
           reusableVertexId.set(vertexId);
           reusableMessage.set(message);
           reusableCurrentMessage.set(partitionMap.get(vertexId));
-          combiner.combine(reusableVertexId, reusableCurrentMessage,
+          messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
               reusableMessage);
           message = reusableCurrentMessage.get();
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 28f3656..7541418 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -90,7 +90,7 @@ public class NettyWorkerClient<I extends WritableComparable,
   @Override
   public void newSuperstep(SuperstepMetricsRegistry metrics) {
     superstepRequestCounters.clear();
-    superstepRequestCounters.put(RequestType.SEND_VERTEX_REQUEST,
+    superstepRequestCounters.put(RequestType.SEND_WORKER_VERTICES_REQUEST,
         metrics.getCounter(MetricNames.SEND_VERTEX_REQUESTS));
     superstepRequestCounters.put(RequestType.SEND_WORKER_MESSAGES_REQUEST,
         metrics.getCounter(MetricNames.SEND_WORKER_MESSAGES_REQUESTS));

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 34a3d1f..0166713 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.comm.netty;
 
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.SendEdgeCache;
@@ -32,8 +35,10 @@ import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
 import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
+import org.apache.giraph.comm.requests.SendWorkerVerticesRequest;
 import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
@@ -45,6 +50,7 @@ import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -52,18 +58,10 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.util.PercentGauge;
-
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
-import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
-import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
-import static org.apache.giraph.conf.GiraphConstants.MAX_MUTATIONS_PER_REQUEST;
-
 /**
  * Aggregate requests and sends them to the thread-safe NettyClient.  This
  * class is not thread-safe and expected to be used and then thrown away after
@@ -91,8 +89,12 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
       new SendMutationsCache<I, V, E>();
   /** NettyClient that could be shared among one or more instances */
   private final WorkerClient<I, V, E> workerClient;
+  /** Messages sent during the last superstep */
+  private long totalMsgsSentInSuperstep = 0;
   /** Maximum size of messages per remote worker to cache before sending */
   private final int maxMessagesSizePerWorker;
+  /** Maximum size of vertices per remote worker to cache before sending. */
+  private final int maxVerticesSizePerWorker;
   /** Maximum size of edges per remote worker to cache before sending. */
   private final int maxEdgesSizePerWorker;
   /** Maximum number of mutations per partition before sending */
@@ -124,9 +126,14 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     this.workerClient = serviceWorker.getWorkerClient();
     this.configuration = conf;
 
-    sendPartitionCache = new SendPartitionCache<I, V, E>(context, conf);
+
+    sendPartitionCache =
+        new SendPartitionCache<I, V, E>(conf, serviceWorker);
     sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
-    maxMessagesSizePerWorker = MAX_MSG_REQUEST_SIZE.get(conf);
+    maxMessagesSizePerWorker =
+        GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
+    maxVerticesSizePerWorker =
+        GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
     if (this.configuration.isOneToAllMsgSendingEnabled()) {
       sendMessageCache =
         new SendMessageToAllCache<I, Writable>(conf, serviceWorker,
@@ -136,8 +143,10 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         new SendMessageCache<I, Writable>(conf, serviceWorker,
           this, maxMessagesSizePerWorker);
     }
-    maxEdgesSizePerWorker = MAX_EDGE_REQUEST_SIZE.get(conf);
-    maxMutationsPerPartition = MAX_MUTATIONS_PER_REQUEST.get(conf);
+    maxEdgesSizePerWorker =
+        GiraphConfiguration.MAX_EDGE_REQUEST_SIZE.get(conf);
+    maxMutationsPerPartition =
+        GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST.get(conf);
     this.serviceWorker = serviceWorker;
     this.serverData = serviceWorker.getServerData();
 
@@ -249,15 +258,26 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   }
 
   @Override
-  public void sendVertexRequest(PartitionOwner partitionOwner,
+  public boolean sendVertexRequest(PartitionOwner partitionOwner,
       Vertex<I, V, E> vertex) {
-    Partition<I, V, E> partition =
-        sendPartitionCache.addVertex(partitionOwner, vertex);
-    if (partition == null) {
-      return;
+    // Add the vertex to the cache
+    int workerMessageSize = sendPartitionCache.addVertex(
+        partitionOwner, vertex);
+
+    // Send a request if the cache of outgoing message to
+    // the remote worker 'workerInfo' is full enough to be flushed
+    if (workerMessageSize >= maxVerticesSizePerWorker) {
+      PairList<Integer, ExtendedDataOutput>
+          workerPartitionVertices =
+          sendPartitionCache.removeWorkerData(partitionOwner.getWorkerInfo());
+      WritableRequest writableRequest =
+          new SendWorkerVerticesRequest<I, V, E>(
+              configuration, workerPartitionVertices);
+      doRequest(partitionOwner.getWorkerInfo(), writableRequest);
+      return true;
     }
 
-    sendPartitionRequest(partitionOwner.getWorkerInfo(), partition);
+    return false;
   }
 
   @Override
@@ -390,17 +410,24 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
 
   @Override
   public void flush() throws IOException {
-    // Execute the remaining send partitions (if any)
-    for (Map.Entry<PartitionOwner, Partition<I, V, E>> entry :
-        sendPartitionCache.getOwnerPartitionMap().entrySet()) {
-      sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue());
-    }
-    sendPartitionCache.clear();
-
     // Execute the remaining sends messages (if any)
     // including one-to-one and one-to-all messages.
     sendMessageCache.flush();
 
+    // Execute the remaining sends vertices (if any)
+    PairList<WorkerInfo, PairList<Integer, ExtendedDataOutput>>
+        remainingVertexCache = sendPartitionCache.removeAllData();
+    PairList<WorkerInfo,
+        PairList<Integer, ExtendedDataOutput>>.Iterator
+        vertexIterator = remainingVertexCache.getIterator();
+    while (vertexIterator.hasNext()) {
+      vertexIterator.next();
+      WritableRequest writableRequest =
+          new SendWorkerVerticesRequest(
+              configuration, vertexIterator.getCurrentSecond());
+      doRequest(vertexIterator.getCurrentFirst(), writableRequest);
+    }
+
     // Execute the remaining sends edges (if any)
     PairList<WorkerInfo, PairList<Integer,
         ByteArrayVertexIdEdges<I, E>>>
@@ -448,7 +475,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
    * @param writableRequest Request to either submit or run locally
    */
   public void doRequest(WorkerInfo workerInfo,
-    WritableRequest writableRequest) {
+                         WritableRequest writableRequest) {
     // If this is local, execute locally
     if (serviceWorker.getWorkerInfo().getTaskId() ==
         workerInfo.getTaskId()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 3473de1..4f6c17b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -161,7 +161,7 @@ public class NettyWorkerServer<I extends WritableComparable,
           getPartitionDestinationVertices(partitionId);
       if (!Iterables.isEmpty(destinations)) {
         Partition<I, V, E> partition =
-            service.getPartitionStore().getPartition(partitionId);
+            service.getPartitionStore().getOrCreatePartition(partitionId);
         for (I vertexId : destinations) {
           if (partition.getVertex(vertexId) == null) {
             if (!resolveVertexIndices.put(partitionId, vertexId)) {
@@ -179,7 +179,7 @@ public class NettyWorkerServer<I extends WritableComparable,
     for (Entry<Integer, Collection<I>> e :
         resolveVertexIndices.asMap().entrySet()) {
       Partition<I, V, E> partition =
-          service.getPartitionStore().getPartition(e.getKey());
+          service.getPartitionStore().getOrCreatePartition(e.getKey());
       for (I vertexIndex : e.getValue()) {
         Vertex<I, V, E> originalVertex =
             partition.getVertex(vertexIndex);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
index 83b408e..f49a2b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -94,7 +94,7 @@ public class RequestEncoder extends OneToOneEncoder {
       writableRequest.write(outputStream);
     } catch (IndexOutOfBoundsException e) {
       LOG.error("encode: Most likely the size of request was not properly " +
-          "specified - see getSerializedSize() in " +
+          "specified (this buffer is too small) - see getSerializedSize() in " +
           writableRequest.getType().getRequestClass());
       throw new IllegalStateException(e);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index a1dcece..7fe2ae7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -34,6 +34,8 @@ public enum RequestType {
   /*end[HADOOP_NON_SECURE]*/
   /** Sending vertices request */
   SEND_VERTEX_REQUEST(SendVertexRequest.class),
+  /** Sending vertices request */
+  SEND_WORKER_VERTICES_REQUEST(SendWorkerVerticesRequest.class),
   /** Sending a partition of messages for next superstep */
   SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class),
   /** Sending one-to-all messages to a worker for next superstep */

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
index e0cb916..863d7ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
@@ -29,7 +29,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 /**
- * Send a collection of vertices for a partition.
+ * Send a collection of vertices for a partition.  Note that this doesn't
+ * use a cache - might want to optimize in the future.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
new file mode 100644
index 0000000..386e0ee
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIterator;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Send to a worker one or more partitions of vertices
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public class SendWorkerVerticesRequest<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendWorkerVerticesRequest.class);
+  /** Worker partitions to be sent */
+  private PairList<Integer, ExtendedDataOutput> workerPartitions;
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendWorkerVerticesRequest() { }
+
+  /**
+   * Constructor for sending a request.
+   *
+   * @param conf Configuration
+   * @param workerPartitions Partitions to be send in this request
+   */
+  public SendWorkerVerticesRequest(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      PairList<Integer, ExtendedDataOutput> workerPartitions) {
+    this.workerPartitions = workerPartitions;
+    setConf(conf);
+  }
+
+  @Override
+  public void readFieldsRequest(DataInput input) throws IOException {
+    int numPartitions = input.readInt();
+    workerPartitions = new PairList<Integer, ExtendedDataOutput>();
+    workerPartitions.initialize(numPartitions);
+    while (numPartitions-- > 0) {
+      final int partitionId = input.readInt();
+      ExtendedDataOutput partitionData =
+          WritableUtils.readExtendedDataOutput(input, getConf());
+      workerPartitions.add(partitionId, partitionData);
+    }
+  }
+
+  @Override
+  public void writeRequest(DataOutput output) throws IOException {
+    output.writeInt(workerPartitions.getSize());
+    PairList<Integer, ExtendedDataOutput>.Iterator
+        iterator = workerPartitions.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      output.writeInt(iterator.getCurrentFirst());
+      WritableUtils.writeExtendedDataOutput(
+          iterator.getCurrentSecond(), output);
+    }
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_WORKER_VERTICES_REQUEST;
+  }
+
+  @Override
+  public void doRequest(ServerData<I, V, E> serverData) {
+    PairList<Integer, ExtendedDataOutput>.Iterator
+        iterator = workerPartitions.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      VertexIterator<I, V, E> vertexIterator =
+          new VertexIterator<I, V, E>(
+          iterator.getCurrentSecond(), getConf());
+      serverData.getPartitionStore().getOrCreatePartition(
+          iterator.getCurrentFirst()).addPartitionVertices(
+          vertexIterator);
+    }
+  }
+
+  @Override
+  public int getSerializedSize() {
+    int size = 0;
+    PairList<Integer, ExtendedDataOutput>.Iterator iterator =
+        workerPartitions.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      // 4 bytes for the partition id and 4 bytes for the size
+      size += 8 + iterator.getCurrentSecond().getPos();
+    }
+    return size;
+  }
+}
+


Mime
View raw message