giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject git commit: updated refs/heads/trunk to d31949a
Date Sat, 20 Jul 2013 06:57:12 GMT
Updated Branches:
  refs/heads/trunk 345b3d965 -> d31949a11


GIRAPH-701: Communication improvement using one-to-all message
sending (Bingjing via aching)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d31949a1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d31949a1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d31949a1

Branch: refs/heads/trunk
Commit: d31949a11d184f337fa1287f0c2b8f769bae82fd
Parents: 345b3d9
Author: Avery Ching <aching@fb.com>
Authored: Fri Jul 19 22:35:23 2013 -0700
Committer: Avery Ching <aching@fb.com>
Committed: Fri Jul 19 23:34:38 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   3 +
 .../java/org/apache/giraph/bsp/BspService.java  |   2 +
 .../java/org/apache/giraph/comm/SendCache.java  | 101 ++++++++--
 .../apache/giraph/comm/SendMessageCache.java    | 190 ++++++++++++++++++-
 .../java/org/apache/giraph/comm/ServerData.java |  12 ++
 .../comm/WorkerClientRequestProcessor.java      |  27 ++-
 .../comm/messages/OneMessagePerVertexStore.java |   1 +
 .../NettyWorkerClientRequestProcessor.java      |  81 ++++----
 .../giraph/comm/requests/RequestType.java       |   3 +
 .../apache/giraph/conf/GiraphConfiguration.java |  16 ++
 .../org/apache/giraph/conf/GiraphConstants.java |   9 +
 .../org/apache/giraph/counters/GiraphStats.java |  16 +-
 .../org/apache/giraph/graph/Computation.java    |  21 +-
 .../apache/giraph/graph/ComputeCallable.java    |  19 +-
 .../org/apache/giraph/graph/GlobalStats.java    |  20 +-
 .../apache/giraph/master/BspServiceMaster.java  |   4 +
 .../org/apache/giraph/metrics/MetricNames.java  |   3 +
 .../apache/giraph/partition/PartitionStats.java |  30 ++-
 .../giraph/utils/ByteArrayVertexIdData.java     |  16 ++
 .../apache/giraph/worker/BspServiceWorker.java  |  15 +-
 .../org/apache/giraph/comm/RequestTest.java     |  48 +++++
 .../java/org/apache/giraph/utils/MockUtils.java |  40 +++-
 .../SimpleTriangleClosingComputationTest.java   |   6 +-
 23 files changed, 585 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3539a00..5e2018d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-701: Communication improvement using one-to-all message
+  sending (Bingjing via aching)
+
   GIRAPH-721: Don't call progress on each edge/vertex loaded (majakabiljo)
 
   GIRAPH-720: Provide a way to change job name (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index ff3f06d..42e8e7e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -143,6 +143,8 @@ public abstract class BspService<I extends WritableComparable,
       "_partitionStatsKey";
   /** JSON message count key */
   public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
+  /** JSON message bytes count key */
+  public static final String JSONOBJ_NUM_MESSAGE_BYTES_KEY = "_numMsgBytesKey";
   /** JSON metrics key */
   public static final String JSONOBJ_METRICS_KEY = "_metricsKey";
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
index 92d0926..30c07ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
@@ -42,17 +42,21 @@ import java.util.Map;
 @SuppressWarnings("unchecked")
 public abstract class SendCache<I extends WritableComparable, T,
     B extends ByteArrayVertexIdData<I, T>> {
+  /** How big to initially make output streams for each worker's partitions */
+  private final int[] initialBufferSizes;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+  /** Service worker */
+  private final CentralizedServiceWorker serviceWorker;
   /** Internal cache */
   private final ByteArrayVertexIdData<I, T>[] dataCache;
   /** Size of data (in bytes) for each worker */
   private final int[] dataSizes;
-  /** How big to initially make output streams for each worker's partitions */
-  private final int[] initialBufferSizes;
+  /** Total number of workers */
+  private final int numWorkers;
   /** List of partition ids belonging to a worker */
   private final Map<WorkerInfo, List<Integer>> workerPartitions =
       Maps.newHashMap();
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor.
@@ -68,7 +72,7 @@ public abstract class SendCache<I extends WritableComparable, T,
                    int maxRequestSize,
                    float additionalRequestSize) {
     this.conf = conf;
-
+    this.serviceWorker = serviceWorker;
     int maxPartition = 0;
     for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
       List<Integer> workerPartitionIds =
@@ -96,6 +100,7 @@ public abstract class SendCache<I extends WritableComparable, T,
       initialBufferSizes[workerInfo.getTaskId()] =
           initialRequestSize / workerPartitions.get(workerInfo).size();
     }
+    numWorkers = maxWorker + 1;
   }
 
   /**
@@ -117,28 +122,65 @@ public abstract class SendCache<I extends WritableComparable, T,
   public int addData(WorkerInfo workerInfo,
                      int partitionId, I destVertexId, T data) {
     // Get the data collection
+    ByteArrayVertexIdData<I, T> partitionData =
+      getPartitionData(workerInfo, partitionId);
+    int originalSize = partitionData.getSize();
+    partitionData.add(destVertexId, data);
+    // Update the size of cached, outgoing data per worker
+    dataSizes[workerInfo.getTaskId()] +=
+      partitionData.getSize() - originalSize;
+    return dataSizes[workerInfo.getTaskId()];
+  }
+
+  /**
+   * This method is similar to the method above,
+   * but use a serialized id to replace original I type
+   * destVertexId.
+   *
+   * @param workerInfo The remote worker destination
+   * @param partitionId The remote Partition this message belongs to
+   * @param serializedId The byte array to store the serialized target vertex id
+   * @param idPos The length of bytes of serialized id in the byte array above
+   * @param data Data to send to remote worker
+   * @return The number of bytes added to the target worker
+   */
+  public int addData(WorkerInfo workerInfo, int partitionId,
+    byte[] serializedId, int idPos, T data) {
+    // Get the data collection
+    ByteArrayVertexIdData<I, T> partitionData =
+      getPartitionData(workerInfo, partitionId);
+    int originalSize = partitionData.getSize();
+    partitionData.add(serializedId, idPos, data);
+    // Update the size of cached, outgoing data per worker
+    dataSizes[workerInfo.getTaskId()] +=
+      partitionData.getSize() - originalSize;
+    return dataSizes[workerInfo.getTaskId()];
+  }
+
+  /**
+   * This method tries to get a partition data from the data cache.
+   * If null, it will create one.
+   *
+   * @param workerInfo The remote worker destination
+   * @param partitionId The remote Partition this message belongs to
+   * @return The partition data in data cache
+   */
+  private ByteArrayVertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
+    int partitionId) {
     ByteArrayVertexIdData<I, T> partitionData = dataCache[partitionId];
-    int originalSize = 0;
     if (partitionData == null) {
       partitionData = createByteArrayVertexIdData();
       partitionData.setConf(conf);
       partitionData.initialize(initialBufferSizes[workerInfo.getTaskId()]);
       dataCache[partitionId] = partitionData;
-    } else {
-      originalSize = partitionData.getSize();
     }
-    partitionData.add(destVertexId, data);
-
-    // Update the size of cached, outgoing data per worker
-    dataSizes[workerInfo.getTaskId()] +=
-        partitionData.getSize() - originalSize;
-    return dataSizes[workerInfo.getTaskId()];
+    return partitionData;
   }
 
   /**
    * Gets the data for a worker and removes it from the cache.
    *
-   * @param workerInfo the address of the worker who owns the data
+   * @param workerInfo The address of the worker who owns the data
    *                   partitions that are receiving the data
    * @return List of pairs (partitionId, ByteArrayVertexIdData),
    *         where all partition ids belong to workerInfo
@@ -177,7 +219,34 @@ public abstract class SendCache<I extends WritableComparable, T,
     return allData;
   }
 
-  public ImmutableClassesGiraphConfiguration getConf() {
+  protected ImmutableClassesGiraphConfiguration getConf() {
     return conf;
   }
+
+  /**
+   * Get the service worker.
+   *
+   * @return CentralizedServiceWorker
+   */
+  protected CentralizedServiceWorker getServiceWorker() {
+    return serviceWorker;
+  }
+
+  /**
+   * Get the initial buffer size for the messages sent to a worker.
+   *
+   * @param taskId The task ID of a worker.
+   * @return The initial buffer size for a worker.
+   */
+  protected int getSendWorkerInitialBufferSize(int taskId) {
+    return initialBufferSizes[taskId];
+  }
+
+  protected int getNumWorkers() {
+    return this.numWorkers;
+  }
+
+  protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() {
+    return workerPartitions;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 2eeac18..8df0dda 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -18,13 +18,22 @@
 
 package org.apache.giraph.comm;
 
+import java.util.Iterator;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
 
 import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
 import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
@@ -38,22 +47,40 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
  */
 public class SendMessageCache<I extends WritableComparable, M extends Writable>
     extends SendCache<I, M, ByteArrayVertexIdMessages<I, M>> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendMessageCache.class);
+  /** Messages sent during the last superstep */
+  protected long totalMsgsSentInSuperstep = 0;
+  /** Message bytes sent during the last superstep */
+  protected long totalMsgBytesSentInSuperstep = 0;
+  /** Max message size sent to a worker */
+  protected final int maxMessagesSizePerWorker;
+  /** NettyWorkerClientRequestProcessor for message sending */
+  protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
+
   /**
    * Constructor
    *
    * @param conf Giraph configuration
    * @param serviceWorker Service worker
+   * @param processor NettyWorkerClientRequestProcessor
+   * @param maxMsgSize Max message size sent to a worker
    */
   public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
-      CentralizedServiceWorker<?, ?, ?> serviceWorker) {
+      CentralizedServiceWorker<?, ?, ?> serviceWorker,
+      NettyWorkerClientRequestProcessor<I, ?, ?> processor,
+      int maxMsgSize) {
     super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
         ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
+    maxMessagesSizePerWorker = maxMsgSize;
+    clientProcessor = processor;
   }
 
   @Override
   public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
     return new ByteArrayVertexIdMessages<I, M>(
-        getConf().getOutgoingMessageValueFactory());
+       getConf().getOutgoingMessageValueFactory());
   }
 
   /**
@@ -65,12 +92,29 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    * @param message Message to send to remote worker
    * @return Size of messages for the worker.
    */
-  public int addMessage(WorkerInfo workerInfo,
-                        int partitionId, I destVertexId, M message) {
+  private int addMessage(WorkerInfo workerInfo,
+      int partitionId, I destVertexId, M message) {
     return addData(workerInfo, partitionId, destVertexId, message);
   }
 
   /**
+   * Add a message to the cache with serialized ids.
+   *
+   * @param workerInfo The remote worker destination
+   * @param partitionId The remote Partition this message belongs to
+   * @param serializedId Serialized vertex id that is ultimate destination
+   * @param idSerializerPos The end position of serialized id in the byte array
+   * @param message Message to send to remote worker
+   * @return Size of messages for the worker.
+   */
+  protected int addMessage(WorkerInfo workerInfo, int partitionId,
+      byte[] serializedId, int idSerializerPos, M message) {
+    return addData(
+      workerInfo, partitionId, serializedId,
+      idSerializerPos, message);
+  }
+
+  /**
    * Gets the messages for a worker and removes it from the cache.
    *
    * @param workerInfo the address of the worker who owns the data
@@ -78,7 +122,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    * @return List of pairs (partitionId, ByteArrayVertexIdMessages),
    *         where all partition ids belong to workerInfo
    */
-  public PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+  protected PairList<Integer, ByteArrayVertexIdMessages<I, M>>
   removeWorkerMessages(WorkerInfo workerInfo) {
     return removeWorkerData(workerInfo);
   }
@@ -88,8 +132,142 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    *
    * @return All vertex messages for all partitions
    */
-  public PairList<WorkerInfo, PairList<
+  private PairList<WorkerInfo, PairList<
       Integer, ByteArrayVertexIdMessages<I, M>>> removeAllMessages() {
     return removeAllData();
   }
+
+  /**
+   * Send a message to a target vertex id.
+   *
+   * @param destVertexId Target vertex id
+   * @param message The message sent to the target
+   */
+  public void sendMessageRequest(I destVertexId, M message) {
+    PartitionOwner owner =
+      getServiceWorker().getVertexPartitionOwner(destVertexId);
+    WorkerInfo workerInfo = owner.getWorkerInfo();
+    final int partitionId = owner.getPartitionId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
+        ") to " + destVertexId + " on worker " + workerInfo);
+    }
+    ++totalMsgsSentInSuperstep;
+    // Add the message to the cache
+    int workerMessageSize = addMessage(
+      workerInfo, partitionId, destVertexId, message);
+    // Send a request if the cache of outgoing message to
+    // the remote worker 'workerInfo' is full enough to be flushed
+    if (workerMessageSize >= maxMessagesSizePerWorker) {
+      PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+        workerMessages = removeWorkerMessages(workerInfo);
+      WritableRequest writableRequest =
+        new SendWorkerMessagesRequest<I, M>(workerMessages);
+      totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
+      clientProcessor.doRequest(workerInfo, writableRequest);
+      // Notify sending
+      getServiceWorker().getGraphTaskManager().notifySentMessages();
+    }
+  }
+
+  /**
+   * An iterator wrapper on edges to return
+   * target vertex ids.
+   */
+  private class TargetVertexIdIterator implements Iterator<I> {
+    /** An edge iterator */
+    private Iterator<Edge<I, Writable>> edgesIterator;
+
+    /**
+     * Constructor.
+     *
+     * @param vertex The source vertex of the out edges
+     */
+    private TargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
+      edgesIterator =
+        ((Vertex<I, Writable, Writable>) vertex).getEdges().iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return edgesIterator.hasNext();
+    }
+
+    @Override
+    public I next() {
+      return edgesIterator.next().getTargetVertexId();
+    }
+
+    @Override
+    public void remove() {
+      // No operation.
+    }
+  }
+
+  /**
+   * Send message to all its neighbors
+   *
+   * @param vertex The source vertex
+   * @param message The message sent to a worker
+   */
+  public void sendMessageToAllRequest(Vertex<I, ?, ?> vertex, M message) {
+    TargetVertexIdIterator targetVertexIterator =
+      new TargetVertexIdIterator(vertex);
+    sendMessageToAllRequest(targetVertexIterator, message);
+  }
+
+  /**
+   * Send message to the target ids in the iterator
+   *
+   * @param vertexIdIterator The iterator of target vertex ids
+   * @param message The message sent to a worker
+   */
+  public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
+    while (vertexIdIterator.hasNext()) {
+      sendMessageRequest(vertexIdIterator.next(), message);
+    }
+  }
+
+  /**
+   * Flush the rest of the messages to the workers.
+   */
+  public void flush() {
+    PairList<WorkerInfo, PairList<Integer,
+        ByteArrayVertexIdMessages<I, M>>>
+    remainingMessageCache = removeAllMessages();
+    PairList<WorkerInfo, PairList<
+        Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator
+    iterator = remainingMessageCache.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      WritableRequest writableRequest =
+        new SendWorkerMessagesRequest<I, M>(
+          iterator.getCurrentSecond());
+      totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
+      clientProcessor.doRequest(
+        iterator.getCurrentFirst(), writableRequest);
+    }
+  }
+
+  /**
+   * Reset the message count per superstep.
+   *
+   * @return The message count sent in last superstep
+   */
+  public long resetMessageCount() {
+    long messagesSentInSuperstep = totalMsgsSentInSuperstep;
+    totalMsgsSentInSuperstep = 0;
+    return messagesSentInSuperstep;
+  }
+
+  /**
+   * Reset the message bytes count per superstep.
+   *
+   * @return The message count sent in last superstep
+   */
+  public long resetMessageBytesCount() {
+    long messageBytesSentInSuperstep = totalMsgBytesSentInSuperstep;
+    totalMsgBytesSentInSuperstep = 0;
+    return messageBytesSentInSuperstep;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index a50f673..39bf504 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -80,6 +80,8 @@ public class ServerData<I extends WritableComparable,
    * Holds old aggregators from previous superstep
    */
   private final AllAggregatorServerData allAggregatorData;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E> serviceWorker;
 
   /**
    * Constructor.
@@ -95,6 +97,7 @@ public class ServerData<I extends WritableComparable,
       MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
           messageStoreFactory,
       Mapper<?, ?, ?, ?>.Context context) {
+    this.serviceWorker = service;
     this.conf = conf;
     this.messageStoreFactory = messageStoreFactory;
     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
@@ -188,4 +191,13 @@ public class ServerData<I extends WritableComparable,
   public AllAggregatorServerData getAllAggregatorData() {
     return allAggregatorData;
   }
+
+  /**
+   * Get the reference of the service worker.
+   *
+   * @return CentralizedServiceWorker
+   */
+  public CentralizedServiceWorker<I, V, E> getServiceWorker() {
+    return this.serviceWorker;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
index 89fb3e4..9bdf9ca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 /**
  * Aggregates IPC requests and sends them off
@@ -41,9 +42,24 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
    *
    * @param destVertexId Destination vertex id.
    * @param message Message to send.
-   * @return true if any network I/O occurred.
    */
-  boolean sendMessageRequest(I destVertexId, Writable message);
+  void sendMessageRequest(I destVertexId, Writable message);
+
+  /**
+   * Sends a message through all edges to all destinations.
+   *
+   * @param vertex The source vertex.
+   * @param message  Message to send.
+   */
+  void sendMessageToAllRequest(Vertex<I, V, E> vertex, Writable message);
+
+  /**
+   * Sends a message to the targets in the iterator.
+   *
+   * @param vertexIdIterator The iterator of target vertex ids.
+   * @param message  Message to send.
+   */
+  void sendMessageToAllRequest(Iterator<I> vertexIdIterator, Writable message);
 
   /**
    * Sends a vertex to the appropriate partition owner
@@ -126,4 +142,11 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
    * @return Number of messages sent before the reset.
    */
   long resetMessageCount();
+
+  /**
+   * Get the message bytes sent during this superstep and clear them.
+   *
+   * @return Bytes of messages sent before the reset.
+   */
+  long resetMessageBytesCount();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index bedaf48..29260df 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -89,6 +89,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
       }
     }
   }
+
   @Override
   protected Iterable<M> getMessagesAsIterable(M message) {
     return Collections.singleton(message);

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 7ce0083..34a3d1f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -21,6 +21,7 @@ import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.SendEdgeCache;
 import org.apache.giraph.comm.SendMessageCache;
+import org.apache.giraph.comm.SendMessageToAllCache;
 import org.apache.giraph.comm.SendMutationsCache;
 import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
@@ -31,7 +32,6 @@ import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
 import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -57,6 +57,7 @@ import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.util.PercentGauge;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 
 import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
@@ -90,8 +91,6 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
       new SendMutationsCache<I, V, E>();
   /** NettyClient that could be shared among one or more instances */
   private final WorkerClient<I, V, E> workerClient;
-  /** Messages sent during the last superstep */
-  private long totalMsgsSentInSuperstep = 0;
   /** Maximum size of messages per remote worker to cache before sending */
   private final int maxMessagesSizePerWorker;
   /** Maximum size of edges per remote worker to cache before sending. */
@@ -126,9 +125,17 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     this.configuration = conf;
 
     sendPartitionCache = new SendPartitionCache<I, V, E>(context, conf);
-    sendMessageCache = new SendMessageCache<I, Writable>(conf, serviceWorker);
     sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
     maxMessagesSizePerWorker = MAX_MSG_REQUEST_SIZE.get(conf);
+    if (this.configuration.isOneToAllMsgSendingEnabled()) {
+      sendMessageCache =
+        new SendMessageToAllCache<I, Writable>(conf, serviceWorker,
+          this, maxMessagesSizePerWorker);
+    } else {
+      sendMessageCache =
+        new SendMessageCache<I, Writable>(conf, serviceWorker,
+          this, maxMessagesSizePerWorker);
+    }
     maxEdgesSizePerWorker = MAX_EDGE_REQUEST_SIZE.get(conf);
     maxMutationsPerPartition = MAX_MUTATIONS_PER_REQUEST.get(conf);
     this.serviceWorker = serviceWorker;
@@ -159,34 +166,20 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   }
 
   @Override
-  public boolean sendMessageRequest(I destVertexId, Writable message) {
-    PartitionOwner owner =
-        serviceWorker.getVertexPartitionOwner(destVertexId);
-    WorkerInfo workerInfo = owner.getWorkerInfo();
-    final int partitionId = owner.getPartitionId();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
-          ") to " + destVertexId + " on worker " + workerInfo);
-    }
-    ++totalMsgsSentInSuperstep;
+  public void sendMessageRequest(I destVertexId, Writable message) {
+    this.sendMessageCache.sendMessageRequest(destVertexId, message);
+  }
 
-    // Add the message to the cache
-    int workerMessageSize = sendMessageCache.addMessage(
-        workerInfo, partitionId, destVertexId, message);
-
-    // Send a request if the cache of outgoing message to
-    // the remote worker 'workerInfo' is full enough to be flushed
-    if (workerMessageSize >= maxMessagesSizePerWorker) {
-      PairList<Integer, ByteArrayVertexIdMessages<I, Writable>>
-          workerMessages =
-          sendMessageCache.removeWorkerMessages(workerInfo);
-      WritableRequest writableRequest =
-          new SendWorkerMessagesRequest<I, Writable>(workerMessages);
-      doRequest(workerInfo, writableRequest);
-      return true;
-    }
+  @Override
+  public void sendMessageToAllRequest(
+    Vertex<I, V, E> vertex, Writable message) {
+    this.sendMessageCache.sendMessageToAllRequest(vertex, message);
+  }
 
-    return false;
+  @Override
+  public void sendMessageToAllRequest(
+    Iterator<I> vertexIdIterator, Writable message) {
+    this.sendMessageCache.sendMessageToAllRequest(vertexIdIterator, message);
   }
 
   @Override
@@ -405,19 +398,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     sendPartitionCache.clear();
 
     // Execute the remaining sends messages (if any)
-    PairList<WorkerInfo, PairList<Integer,
-        ByteArrayVertexIdMessages<I, Writable>>>
-        remainingMessageCache = sendMessageCache.removeAllMessages();
-    PairList<WorkerInfo,
-        PairList<Integer, ByteArrayVertexIdMessages<I, Writable>>>.Iterator
-        iterator = remainingMessageCache.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      WritableRequest writableRequest =
-          new SendWorkerMessagesRequest<I, Writable>(
-              iterator.getCurrentSecond());
-      doRequest(iterator.getCurrentFirst(), writableRequest);
-    }
+    // including one-to-one and one-to-all messages.
+    sendMessageCache.flush();
 
     // Execute the remaining sends edges (if any)
     PairList<WorkerInfo, PairList<Integer,
@@ -451,9 +433,12 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
 
   @Override
   public long resetMessageCount() {
-    long messagesSentInSuperstep = totalMsgsSentInSuperstep;
-    totalMsgsSentInSuperstep = 0;
-    return messagesSentInSuperstep;
+    return this.sendMessageCache.resetMessageCount();
+  }
+
+  @Override
+  public long resetMessageBytesCount() {
+    return this.sendMessageCache.resetMessageBytesCount();
   }
 
   /**
@@ -462,8 +447,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
    * @param workerInfo Worker info
    * @param writableRequest Request to either submit or run locally
    */
-  private void doRequest(WorkerInfo workerInfo,
-                         WritableRequest writableRequest) {
+  public void doRequest(WorkerInfo workerInfo,
+    WritableRequest writableRequest) {
     // If this is local, execute locally
     if (serviceWorker.getWorkerInfo().getTaskId() ==
         workerInfo.getTaskId()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 4129fb8..a1dcece 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -36,6 +36,9 @@ public enum RequestType {
   SEND_VERTEX_REQUEST(SendVertexRequest.class),
   /** Sending a partition of messages for next superstep */
   SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class),
+  /** Sending one-to-all messages to a worker for next superstep */
+  SEND_WORKER_ONETOALL_MESSAGES_REQUEST(
+    SendWorkerOneToAllMessagesRequest.class),
   /**
    * Sending a partition of messages for current superstep
    * (used during partition exchange)

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 21cf245..de0539b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -1036,4 +1036,20 @@ public class GiraphConfiguration extends Configuration
     value = value.replace("%USER%", get("user.name", "unknown_user"));
     return value;
   }
+
+  /**
+   * Enable communication optimization for one-to-all messages.
+   */
+  public void enableOneToAllMsgSending() {
+    ONE_TO_ALL_MSG_SENDING.set(this, true);
+  }
+
+  /**
+   * Return if one-to-all messsage sending is enabled.
+   *
+   * @return True if this option is enabled.
+   */
+  public boolean isOneToAllMsgSendingEnabled() {
+    return ONE_TO_ALL_MSG_SENDING.isTrue(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c4cc96f..f6233e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -869,5 +869,14 @@ public interface GiraphConstants {
           "The application will not mutate the graph topology (the edges). " +
           "It is used to optimise out-of-core graph, by not writing back " +
           "edges every time.");
+
+  /**
+   * This option will enable communication optimization for one-to-all
+   * message sending. For multiple target ids on the same machine,
+   * we only send one message to all the targets.
+   */
+  BooleanConfOption ONE_TO_ALL_MSG_SENDING =
+    new BooleanConfOption("giraph.oneToAllMsgSending", false, "Enable " +
+        "one-to-all message sending strategy");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
index 3f25508..bef0523 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
@@ -40,6 +40,8 @@ public class GiraphStats extends HadoopCountersBase {
   public static final String EDGES_NAME = "Aggregate edges";
   /** sent messages counter name */
   public static final String SENT_MESSAGES_NAME = "Sent messages";
+  /** sent messages counter name */
+  public static final String SENT_MESSAGE_BYTES_NAME = "Sent message bytes";
   /** workers counter name */
   public static final String CURRENT_WORKERS_NAME = "Current workers";
   /** current master partition task counter name */
@@ -68,8 +70,10 @@ public class GiraphStats extends HadoopCountersBase {
   private static final int CURRENT_MASTER_TASK_PARTITION = 6;
   /** Last checkpointed superstep */
   private static final int LAST_CHECKPOINTED_SUPERSTEP = 7;
+  /** Sent message bytes counter */
+  private static final int SENT_MESSAGE_BYTES = 8;
   /** Number of counters in this class */
-  private static final int NUM_COUNTERS = 8;
+  private static final int NUM_COUNTERS = 9;
 
   /** All the counters stored */
   private final GiraphHadoopCounter[] counters;
@@ -87,6 +91,7 @@ public class GiraphStats extends HadoopCountersBase {
     counters[FINISHED_VERTICES] = getCounter(FINISHED_VERTICES_NAME);
     counters[EDGES] = getCounter(EDGES_NAME);
     counters[SENT_MESSAGES] = getCounter(SENT_MESSAGES_NAME);
+    counters[SENT_MESSAGE_BYTES] = getCounter(SENT_MESSAGE_BYTES_NAME);
     counters[CURRENT_WORKERS] = getCounter(CURRENT_WORKERS_NAME);
     counters[CURRENT_MASTER_TASK_PARTITION] =
         getCounter(CURRENT_MASTER_PARTITION_TASK_NAME);
@@ -158,6 +163,15 @@ public class GiraphStats extends HadoopCountersBase {
   }
 
   /**
+   * Get SentMessageBytes counter
+   *
+   * @return SentMessageBytes counter
+   */
+  public GiraphHadoopCounter getSentMessageBytes() {
+    return counters[SENT_MESSAGE_BYTES];
+  }
+
+  /**
    * Get CurrentWorkers counter
    *
    * @return CurrentWorkers counter

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
index 87d5879..34b0109 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 /**
  * Basic abstract class for writing a BSP application for computation.
@@ -155,9 +156,7 @@ public abstract class Computation<I extends WritableComparable,
    * @param message Message data to send
    */
   public void sendMessage(I id, M2 message) {
-    if (workerClientRequestProcessor.sendMessageRequest(id, message)) {
-      graphTaskManager.notifySentMessages();
-    }
+    workerClientRequestProcessor.sendMessageRequest(id, message);
   }
 
   /**
@@ -167,9 +166,19 @@ public abstract class Computation<I extends WritableComparable,
    * @param message Message sent to all edges.
    */
   public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
-    for (Edge<I, E> edge : vertex.getEdges()) {
-      sendMessage(edge.getTargetVertexId(), message);
-    }
+    workerClientRequestProcessor.sendMessageToAllRequest(vertex, message);
+  }
+
+  /**
+   * Send a message to multiple target vertex ids in the iterator.
+   *
+   * @param vertexIdIterator An iterator to multiple target vertex ids.
+   * @param message Message sent to all targets in the iterator.
+   */
+  public void sendMessageToMultipleEdges(
+    Iterator<I> vertexIdIterator, M2 message) {
+    workerClientRequestProcessor.sendMessageToAllRequest(
+      vertexIdIterator, message);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index a9bf3fd..77d9f5e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -96,6 +96,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   // Per-Superstep Metrics
   /** Messages sent */
   private final Counter messagesSentCounter;
+  /** Message bytes sent */
+  private final Counter messageBytesSentCounter;
   /** Timer for single compute() call */
   private final Timer computeOneTimer;
 
@@ -127,6 +129,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
     // not long-lived, so just instantiating in the constructor is good enough.
     computeOneTimer = metrics.getTimer(TimerDesc.COMPUTE_ONE);
     messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT);
+    messageBytesSentCounter =
+      metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT);
   }
 
   @Override
@@ -164,6 +168,10 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
         long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
         partitionStats.addMessagesSentCount(partitionMsgs);
         messagesSentCounter.inc(partitionMsgs);
+        long partitionMsgBytes =
+          workerClientRequestProcessor.resetMessageBytesCount();
+        partitionStats.addMessageBytesSentCount(partitionMsgBytes);
+        messageBytesSentCounter.inc(partitionMsgBytes);
         timedLogger.info("call: Completed " +
             partitionStatsList.size() + " partitions, " +
             partitionIdQueue.size() + " remaining " +
@@ -193,6 +201,15 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
     }
     try {
       workerClientRequestProcessor.flush();
+      // The messages flushed out from the cache is
+      // from the last partition processed
+      if (partitionStatsList.size() > 0) {
+        long partitionMsgBytes =
+          workerClientRequestProcessor.resetMessageBytesCount();
+        partitionStatsList.get(partitionStatsList.size() - 1).
+          addMessageBytesSentCount(partitionMsgBytes);
+        messageBytesSentCounter.inc(partitionMsgBytes);
+      }
       aggregatorUsage.finishThreadComputation();
     } catch (IOException e) {
       throw new IllegalStateException("call: Flushing failed.", e);
@@ -211,7 +228,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       Computation<I, V, E, M1, M2> computation,
       Partition<I, V, E> partition) throws IOException, InterruptedException {
     PartitionStats partitionStats =
-        new PartitionStats(partition.getId(), 0, 0, 0, 0);
+        new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
     // Make sure this is thread-safe across runs
     synchronized (partition) {
       for (Vertex<I, V, E> vertex : partition) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index f3cbea2..bc56c9c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -37,6 +37,8 @@ public class GlobalStats implements Writable {
   private long edgeCount = 0;
   /** All messages sent in the last superstep */
   private long messageCount = 0;
+  /** All message bytes sent in the last superstep */
+  private long messageBytesCount = 0;
   /** Whether the computation should be halted */
   private boolean haltComputation = false;
 
@@ -67,6 +69,10 @@ public class GlobalStats implements Writable {
     return messageCount;
   }
 
+  public long getMessageBytesCount() {
+    return messageBytesCount;
+  }
+
   public boolean getHaltComputation() {
     return haltComputation;
   }
@@ -84,12 +90,22 @@ public class GlobalStats implements Writable {
     this.messageCount += messageCount;
   }
 
+  /**
+   * Add messages to the global stats.
+   *
+   * @param msgBytesCount Number of message bytes to be added.
+   */
+  public void addMessageBytesCount(long msgBytesCount) {
+    this.messageBytesCount += msgBytesCount;
+  }
+
   @Override
   public void readFields(DataInput input) throws IOException {
     vertexCount = input.readLong();
     finishedVertexCount = input.readLong();
     edgeCount = input.readLong();
     messageCount = input.readLong();
+    messageBytesCount = input.readLong();
     haltComputation = input.readBoolean();
   }
 
@@ -99,6 +115,7 @@ public class GlobalStats implements Writable {
     output.writeLong(finishedVertexCount);
     output.writeLong(edgeCount);
     output.writeLong(messageCount);
+    output.writeLong(messageBytesCount);
     output.writeBoolean(haltComputation);
   }
 
@@ -106,6 +123,7 @@ public class GlobalStats implements Writable {
   public String toString() {
     return "(vtx=" + vertexCount + ",finVtx=" +
         finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
-        messageCount + ",haltComputation=" + haltComputation + ")";
+        messageCount + ",msgBytesCount=" +
+          messageBytesCount + ",haltComputation=" + haltComputation + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 1d3cff0..d08495b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -954,6 +954,9 @@ public class BspServiceMaster<I extends WritableComparable,
         globalStats.addMessageCount(
             workerFinishedInfoObj.getLong(
                 JSONOBJ_NUM_MESSAGES_KEY));
+        globalStats.addMessageBytesCount(
+          workerFinishedInfoObj.getLong(
+              JSONOBJ_NUM_MESSAGE_BYTES_KEY));
         if (conf.metricsEnabled() &&
             workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
           WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
@@ -1959,6 +1962,7 @@ public class BspServiceMaster<I extends WritableComparable,
     gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
     gs.getEdges().setValue(globalStats.getEdgeCount());
     gs.getSentMessages().setValue(globalStats.getMessageCount());
+    gs.getSentMessageBytes().setValue(globalStats.getMessageBytesCount());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
index cc237ac..f731bbc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
@@ -68,6 +68,9 @@ public interface MetricNames {
   /** Counter of messages sent in superstep */
   String MESSAGES_SENT = "messages-sent";
 
+  /** Counter of messages sent in superstep */
+  String MESSAGE_BYTES_SENT = "message-bytes-sent";
+
   /** Histogram for vertices in mutations requests */
   String VERTICES_IN_MUTATION_REQUEST = "vertices-per-mutations-request";
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
index b8eeca9..624e385 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
@@ -38,6 +38,8 @@ public class PartitionStats implements Writable {
   private long edgeCount = 0;
   /** Messages sent from this partition */
   private long messagesSentCount = 0;
+  /** Message byetes sent from this partition */
+  private long messageBytesSentCount = 0;
 
   /**
    * Default constructor for reflection.
@@ -52,17 +54,20 @@ public class PartitionStats implements Writable {
    * @param finishedVertexCount Finished vertex count.
    * @param edgeCount Edge count.
    * @param messagesSentCount Number of messages sent
+   * @param messageBytesSentCount Number of message bytes sent
    */
   public PartitionStats(int partitionId,
       long vertexCount,
       long finishedVertexCount,
       long edgeCount,
-      long messagesSentCount) {
+      long messagesSentCount,
+      long messageBytesSentCount) {
     this.partitionId = partitionId;
     this.vertexCount = vertexCount;
     this.finishedVertexCount = finishedVertexCount;
     this.edgeCount = edgeCount;
     this.messagesSentCount = messagesSentCount;
+    this.messageBytesSentCount = messageBytesSentCount;
   }
 
   /**
@@ -151,6 +156,24 @@ public class PartitionStats implements Writable {
     return messagesSentCount;
   }
 
+  /**
+   * Add message bytes to messageBytesSentCount.
+   *
+   * @param messageBytesSentCount Number of message bytes to add.
+   */
+  public void addMessageBytesSentCount(long messageBytesSentCount) {
+    this.messageBytesSentCount += messageBytesSentCount;
+  }
+
+  /**
+   * Get the message bytes sent count.
+   *
+   * @return Message bytes sent count.
+   */
+  public long getMessageBytesSentCount() {
+    return messageBytesSentCount;
+  }
+
   @Override
   public void readFields(DataInput input) throws IOException {
     partitionId = input.readInt();
@@ -158,6 +181,7 @@ public class PartitionStats implements Writable {
     finishedVertexCount = input.readLong();
     edgeCount = input.readLong();
     messagesSentCount = input.readLong();
+    messageBytesSentCount = input.readLong();
   }
 
   @Override
@@ -167,12 +191,14 @@ public class PartitionStats implements Writable {
     output.writeLong(finishedVertexCount);
     output.writeLong(edgeCount);
     output.writeLong(messagesSentCount);
+    output.writeLong(messageBytesSentCount);
   }
 
   @Override
   public String toString() {
     return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
         finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" +
-        messagesSentCount + ")";
+        messagesSentCount + ",msgBytesSent=" +
+          messageBytesSentCount + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index 26c547b..5c56038 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -101,6 +101,22 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
   }
 
   /**
+   * Add a serialized vertex id and data.
+   *
+   * @param serializedId The bye array which holds the serialized id.
+   * @param idPos The end position of the serialized id in the byte array.
+   * @param data Data
+   */
+  public void add(byte[] serializedId, int idPos, T data) {
+    try {
+      extendedDataOutput.write(serializedId, 0, idPos);
+      writeData(extendedDataOutput, data);
+    } catch (IOException e) {
+      throw new IllegalStateException("add: IOException", e);
+    }
+  }
+
+  /**
    * Get the number of bytes used.
    *
    * @return Bytes used

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 52bac3f..da1e7fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -573,7 +573,7 @@ public class BspServiceWorker<I extends WritableComparable,
               partition.getVertexCount(),
               0,
               partition.getEdgeCount(),
-              0);
+              0, 0);
       partitionStatsList.add(partitionStats);
       getPartitionStore().putPartition(partition);
     }
@@ -741,9 +741,11 @@ public class BspServiceWorker<I extends WritableComparable,
     getGraphTaskManager().notifyFinishedCommunication();
 
     long workerSentMessages = 0;
+    long workerSentMessageBytes = 0;
     long localVertices = 0;
     for (PartitionStats partitionStats : partitionStatsList) {
       workerSentMessages += partitionStats.getMessagesSentCount();
+      workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
       localVertices += partitionStats.getVertexCount();
     }
 
@@ -756,10 +758,12 @@ public class BspServiceWorker<I extends WritableComparable,
     if (LOG.isInfoEnabled()) {
       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
           ", messages = " + workerSentMessages + " " +
+          ", message bytes = " + workerSentMessageBytes + " , " +
           MemoryUtils.getRuntimeMemoryStats());
     }
 
-    writeFinshedSuperstepInfoToZK(partitionStatsList, workerSentMessages);
+    writeFinshedSuperstepInfoToZK(partitionStatsList,
+      workerSentMessages, workerSentMessageBytes);
 
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
         "finishSuperstep: (waiting for rest " +
@@ -854,9 +858,12 @@ public class BspServiceWorker<I extends WritableComparable,
    *
    * @param partitionStatsList List of partition stats from superstep.
    * @param workerSentMessages Number of messages sent in superstep.
+   * @param workerSentMessageBytes Number of message bytes sent
+   *                               in superstep.
    */
   private void writeFinshedSuperstepInfoToZK(
-      List<PartitionStats> partitionStatsList, long workerSentMessages) {
+      List<PartitionStats> partitionStatsList, long workerSentMessages,
+      long workerSentMessageBytes) {
     Collection<PartitionStats> finalizedPartitionStats =
         workerGraphPartitioner.finalizePartitionStats(
             partitionStatsList, getPartitionStore());
@@ -873,6 +880,8 @@ public class BspServiceWorker<I extends WritableComparable,
       workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
           Base64.encodeBytes(partitionStatsBytes));
       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
+      workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
+        workerSentMessageBytes);
       workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
           Base64.encodeBytes(metricsBytes));
     } catch (JSONException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 2e60c09..115c108 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -24,6 +24,7 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.comm.requests.SendWorkerOneToAllMessagesRequest;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -35,7 +36,9 @@ import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayOneToAllMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;
@@ -189,6 +192,51 @@ public class RequestTest {
   }
 
   @Test
+  public void sendWorkerOneToAllMessagesRequest() throws IOException {
+    // Data to send
+    ByteArrayOneToAllMessages<IntWritable, IntWritable>
+        dataToSend = new ByteArrayOneToAllMessages<
+        IntWritable, IntWritable>(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+    dataToSend.setConf(conf);
+    dataToSend.initialize();
+    ExtendedDataOutput output = conf.createExtendedDataOutput();
+    for (int i = 1; i <= 7; ++i) {
+      IntWritable vertexId = new IntWritable(i);
+      vertexId.write(output);
+    }
+    dataToSend.add(output.getByteArray(), output.getPos(), 7, new IntWritable(1));
+
+    // Send the request
+    SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable> request =
+      new SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable>(dataToSend, conf);
+    client.sendWritableRequest(workerInfo.getTaskId(), request);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output
+    Iterable<IntWritable> vertices =
+        serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
+    int keySum = 0;
+    int messageSum = 0;
+    for (IntWritable vertexId : vertices) {
+      keySum += vertexId.get();
+      Iterable<IntWritable> messages =
+          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
+              vertexId);
+      synchronized (messages) {
+        for (IntWritable message : messages) {
+          messageSum += message.get();
+        }
+      }
+    }
+    assertEquals(28, keySum);
+    assertEquals(7, messageSum);
+  }
+
+  @Test
   public void sendPartitionMutationsRequest() throws IOException {
     // Data to send
     int partitionId = 19;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index bc5b5e2..97e88f8 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -30,6 +30,10 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.BasicPartitionOwner;
 import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.partition.SimplePartitionStore;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -94,6 +98,10 @@ public class MockUtils {
             Mockito.verify(workerClientRequestProcessor).sendMessageRequest
                 (targetVertexId, message);
         }
+        
+        public void verifyMessageSentToAllEdges(Vertex<I, V, E> vertex, M message) {
+          Mockito.verify(workerClientRequestProcessor).sendMessageToAllRequest(vertex, message);
+      }
 
         /** assert that the test vertex has sent no message to a particular vertex */
         public void verifyNoMessageSent() {
@@ -149,6 +157,12 @@ public class MockUtils {
     return env;
   }
 
+  /**
+   * Prepare a mocked CentralizedServiceWorker.
+   *
+   * @param numOfPartitions The number of partitions
+   * @return CentralizedServiceWorker
+   */
   public static CentralizedServiceWorker<IntWritable, IntWritable,
       IntWritable> mockServiceGetVertexPartitionOwner(final int
       numOfPartitions) {
@@ -167,14 +181,24 @@ public class MockUtils {
     return service;
   }
 
+  /**
+   * Prepare a ServerData object.
+   *
+   * @param conf Configuration
+   * @param context Context
+   * @return ServerData
+   */
   public static ServerData<IntWritable, IntWritable, IntWritable>
-  createNewServerData(ImmutableClassesGiraphConfiguration conf,
-      Mapper.Context context) {
-    return new ServerData<IntWritable, IntWritable, IntWritable>(
-        Mockito.mock(CentralizedServiceWorker.class),
-        conf,
-        ByteArrayMessagesPerVertexStore.newFactory(
-            MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
-        context);
+    createNewServerData(
+    ImmutableClassesGiraphConfiguration conf, Mapper.Context context) {
+    CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> serviceWorker =
+      MockUtils.mockServiceGetVertexPartitionOwner(1);
+    ServerData<IntWritable, IntWritable, IntWritable> serverData =
+      new ServerData<IntWritable, IntWritable, IntWritable>(
+      serviceWorker, conf, ByteArrayMessagesPerVertexStore.newFactory(
+        serviceWorker, conf), context);
+    // Here we add a partition to simulate the case that there is one partition.
+    serverData.getPartitionStore().addPartition(new SimplePartition());
+    return serverData;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java
index 7346745..b42d504 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java
@@ -59,10 +59,8 @@ public class SimpleTriangleClosingComputationTest {
     computation.compute(vertex, Lists.<IntWritable>newArrayList(
       new IntWritable(83), new IntWritable(42)));
 
-    env.verifyMessageSent(new IntWritable(5), new IntWritable(5));
-    env.verifyMessageSent(new IntWritable(5), new IntWritable(7));
-    env.verifyMessageSent(new IntWritable(7), new IntWritable(5));
-    env.verifyMessageSent(new IntWritable(7), new IntWritable(7));
+    env.verifyMessageSentToAllEdges(vertex, new IntWritable(5));
+    env.verifyMessageSentToAllEdges(vertex, new IntWritable(7));
   }
 
   /** Test behavior of compute() with incoming messages (superstep 1) */


Mime
View raw message