Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C651910F81 for ; Sat, 20 Jul 2013 06:57:27 +0000 (UTC) Received: (qmail 75354 invoked by uid 500); 20 Jul 2013 06:57:26 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 75295 invoked by uid 500); 20 Jul 2013 06:57:15 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 75288 invoked by uid 99); 20 Jul 2013 06:57:13 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Jul 2013 06:57:13 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3EED98AF906; Sat, 20 Jul 2013 06:57:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aching@apache.org To: commits@giraph.apache.org Message-Id: <06cbb0c9d0654cbfa9c74248fb5ae971@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: updated refs/heads/trunk to d31949a Date: Sat, 20 Jul 2013 06:57:12 +0000 (UTC) Updated Branches: refs/heads/trunk 345b3d965 -> d31949a11 GIRAPH-701: Communication improvement using one-to-all message sending (Bingjing via aching) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d31949a1 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d31949a1 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d31949a1 Branch: refs/heads/trunk Commit: d31949a11d184f337fa1287f0c2b8f769bae82fd Parents: 345b3d9 Author: Avery Ching Authored: Fri Jul 19 22:35:23 2013 -0700 Committer: Avery Ching Committed: Fri Jul 19 23:34:38 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 3 + .../java/org/apache/giraph/bsp/BspService.java | 2 + .../java/org/apache/giraph/comm/SendCache.java | 101 ++++++++-- .../apache/giraph/comm/SendMessageCache.java | 190 ++++++++++++++++++- .../java/org/apache/giraph/comm/ServerData.java | 12 ++ .../comm/WorkerClientRequestProcessor.java | 27 ++- .../comm/messages/OneMessagePerVertexStore.java | 1 + .../NettyWorkerClientRequestProcessor.java | 81 ++++---- .../giraph/comm/requests/RequestType.java | 3 + .../apache/giraph/conf/GiraphConfiguration.java | 16 ++ .../org/apache/giraph/conf/GiraphConstants.java | 9 + .../org/apache/giraph/counters/GiraphStats.java | 16 +- .../org/apache/giraph/graph/Computation.java | 21 +- .../apache/giraph/graph/ComputeCallable.java | 19 +- .../org/apache/giraph/graph/GlobalStats.java | 20 +- .../apache/giraph/master/BspServiceMaster.java | 4 + .../org/apache/giraph/metrics/MetricNames.java | 3 + .../apache/giraph/partition/PartitionStats.java | 30 ++- .../giraph/utils/ByteArrayVertexIdData.java | 16 ++ .../apache/giraph/worker/BspServiceWorker.java | 15 +- .../org/apache/giraph/comm/RequestTest.java | 48 +++++ .../java/org/apache/giraph/utils/MockUtils.java | 40 +++- .../SimpleTriangleClosingComputationTest.java | 6 +- 23 files changed, 585 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 3539a00..5e2018d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-701: Communication improvement using one-to-all message + sending (Bingjing via aching) + GIRAPH-721: Don't call progress on each edge/vertex loaded (majakabiljo) GIRAPH-720: Provide a way to change job name (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index ff3f06d..42e8e7e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -143,6 +143,8 @@ public abstract class BspService> { + /** How big to initially make output streams for each worker's partitions */ + private final int[] initialBufferSizes; + /** Giraph configuration */ + private final ImmutableClassesGiraphConfiguration conf; + /** Service worker */ + private final CentralizedServiceWorker serviceWorker; /** Internal cache */ private final ByteArrayVertexIdData[] dataCache; /** Size of data (in bytes) for each worker */ private final int[] dataSizes; - /** How big to initially make output streams for each worker's partitions */ - private final int[] initialBufferSizes; + /** Total number of workers */ + private final int numWorkers; /** List of partition ids belonging to a worker */ private final Map> workerPartitions = Maps.newHashMap(); - /** Giraph configuration */ - private final ImmutableClassesGiraphConfiguration conf; /** * Constructor. @@ -68,7 +72,7 @@ public abstract class SendCache workerPartitionIds = @@ -96,6 +100,7 @@ public abstract class SendCache partitionData = + getPartitionData(workerInfo, partitionId); + int originalSize = partitionData.getSize(); + partitionData.add(destVertexId, data); + // Update the size of cached, outgoing data per worker + dataSizes[workerInfo.getTaskId()] += + partitionData.getSize() - originalSize; + return dataSizes[workerInfo.getTaskId()]; + } + + /** + * This method is similar to the method above, + * but use a serialized id to replace original I type + * destVertexId. + * + * @param workerInfo The remote worker destination + * @param partitionId The remote Partition this message belongs to + * @param serializedId The byte array to store the serialized target vertex id + * @param idPos The length of bytes of serialized id in the byte array above + * @param data Data to send to remote worker + * @return The number of bytes added to the target worker + */ + public int addData(WorkerInfo workerInfo, int partitionId, + byte[] serializedId, int idPos, T data) { + // Get the data collection + ByteArrayVertexIdData partitionData = + getPartitionData(workerInfo, partitionId); + int originalSize = partitionData.getSize(); + partitionData.add(serializedId, idPos, data); + // Update the size of cached, outgoing data per worker + dataSizes[workerInfo.getTaskId()] += + partitionData.getSize() - originalSize; + return dataSizes[workerInfo.getTaskId()]; + } + + /** + * This method tries to get a partition data from the data cache. + * If null, it will create one. + * + * @param workerInfo The remote worker destination + * @param partitionId The remote Partition this message belongs to + * @return The partition data in data cache + */ + private ByteArrayVertexIdData getPartitionData(WorkerInfo workerInfo, + int partitionId) { ByteArrayVertexIdData partitionData = dataCache[partitionId]; - int originalSize = 0; if (partitionData == null) { partitionData = createByteArrayVertexIdData(); partitionData.setConf(conf); partitionData.initialize(initialBufferSizes[workerInfo.getTaskId()]); dataCache[partitionId] = partitionData; - } else { - originalSize = partitionData.getSize(); } - partitionData.add(destVertexId, data); - - // Update the size of cached, outgoing data per worker - dataSizes[workerInfo.getTaskId()] += - partitionData.getSize() - originalSize; - return dataSizes[workerInfo.getTaskId()]; + return partitionData; } /** * Gets the data for a worker and removes it from the cache. * - * @param workerInfo the address of the worker who owns the data + * @param workerInfo The address of the worker who owns the data * partitions that are receiving the data * @return List of pairs (partitionId, ByteArrayVertexIdData), * where all partition ids belong to workerInfo @@ -177,7 +219,34 @@ public abstract class SendCache> getWorkerPartitions() { + return workerPartitions; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java index 2eeac18..8df0dda 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java @@ -18,13 +18,22 @@ package org.apache.giraph.comm; +import java.util.Iterator; + import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; +import org.apache.giraph.comm.requests.SendWorkerMessagesRequest; +import org.apache.giraph.comm.requests.WritableRequest; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.PairList; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE; import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE; @@ -38,22 +47,40 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE; */ public class SendMessageCache extends SendCache> { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(SendMessageCache.class); + /** Messages sent during the last superstep */ + protected long totalMsgsSentInSuperstep = 0; + /** Message bytes sent during the last superstep */ + protected long totalMsgBytesSentInSuperstep = 0; + /** Max message size sent to a worker */ + protected final int maxMessagesSizePerWorker; + /** NettyWorkerClientRequestProcessor for message sending */ + protected final NettyWorkerClientRequestProcessor clientProcessor; + /** * Constructor * * @param conf Giraph configuration * @param serviceWorker Service worker + * @param processor NettyWorkerClientRequestProcessor + * @param maxMsgSize Max message size sent to a worker */ public SendMessageCache(ImmutableClassesGiraphConfiguration conf, - CentralizedServiceWorker serviceWorker) { + CentralizedServiceWorker serviceWorker, + NettyWorkerClientRequestProcessor processor, + int maxMsgSize) { super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf), ADDITIONAL_MSG_REQUEST_SIZE.get(conf)); + maxMessagesSizePerWorker = maxMsgSize; + clientProcessor = processor; } @Override public ByteArrayVertexIdMessages createByteArrayVertexIdData() { return new ByteArrayVertexIdMessages( - getConf().getOutgoingMessageValueFactory()); + getConf().getOutgoingMessageValueFactory()); } /** @@ -65,12 +92,29 @@ public class SendMessageCache * @param message Message to send to remote worker * @return Size of messages for the worker. */ - public int addMessage(WorkerInfo workerInfo, - int partitionId, I destVertexId, M message) { + private int addMessage(WorkerInfo workerInfo, + int partitionId, I destVertexId, M message) { return addData(workerInfo, partitionId, destVertexId, message); } /** + * Add a message to the cache with serialized ids. + * + * @param workerInfo The remote worker destination + * @param partitionId The remote Partition this message belongs to + * @param serializedId Serialized vertex id that is ultimate destination + * @param idSerializerPos The end position of serialized id in the byte array + * @param message Message to send to remote worker + * @return Size of messages for the worker. + */ + protected int addMessage(WorkerInfo workerInfo, int partitionId, + byte[] serializedId, int idSerializerPos, M message) { + return addData( + workerInfo, partitionId, serializedId, + idSerializerPos, message); + } + + /** * Gets the messages for a worker and removes it from the cache. * * @param workerInfo the address of the worker who owns the data @@ -78,7 +122,7 @@ public class SendMessageCache * @return List of pairs (partitionId, ByteArrayVertexIdMessages), * where all partition ids belong to workerInfo */ - public PairList> + protected PairList> removeWorkerMessages(WorkerInfo workerInfo) { return removeWorkerData(workerInfo); } @@ -88,8 +132,142 @@ public class SendMessageCache * * @return All vertex messages for all partitions */ - public PairList>> removeAllMessages() { return removeAllData(); } + + /** + * Send a message to a target vertex id. + * + * @param destVertexId Target vertex id + * @param message The message sent to the target + */ + public void sendMessageRequest(I destVertexId, M message) { + PartitionOwner owner = + getServiceWorker().getVertexPartitionOwner(destVertexId); + WorkerInfo workerInfo = owner.getWorkerInfo(); + final int partitionId = owner.getPartitionId(); + if (LOG.isTraceEnabled()) { + LOG.trace("sendMessageRequest: Send bytes (" + message.toString() + + ") to " + destVertexId + " on worker " + workerInfo); + } + ++totalMsgsSentInSuperstep; + // Add the message to the cache + int workerMessageSize = addMessage( + workerInfo, partitionId, destVertexId, message); + // Send a request if the cache of outgoing message to + // the remote worker 'workerInfo' is full enough to be flushed + if (workerMessageSize >= maxMessagesSizePerWorker) { + PairList> + workerMessages = removeWorkerMessages(workerInfo); + WritableRequest writableRequest = + new SendWorkerMessagesRequest(workerMessages); + totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); + clientProcessor.doRequest(workerInfo, writableRequest); + // Notify sending + getServiceWorker().getGraphTaskManager().notifySentMessages(); + } + } + + /** + * An iterator wrapper on edges to return + * target vertex ids. + */ + private class TargetVertexIdIterator implements Iterator { + /** An edge iterator */ + private Iterator> edgesIterator; + + /** + * Constructor. + * + * @param vertex The source vertex of the out edges + */ + private TargetVertexIdIterator(Vertex vertex) { + edgesIterator = + ((Vertex) vertex).getEdges().iterator(); + } + + @Override + public boolean hasNext() { + return edgesIterator.hasNext(); + } + + @Override + public I next() { + return edgesIterator.next().getTargetVertexId(); + } + + @Override + public void remove() { + // No operation. + } + } + + /** + * Send message to all its neighbors + * + * @param vertex The source vertex + * @param message The message sent to a worker + */ + public void sendMessageToAllRequest(Vertex vertex, M message) { + TargetVertexIdIterator targetVertexIterator = + new TargetVertexIdIterator(vertex); + sendMessageToAllRequest(targetVertexIterator, message); + } + + /** + * Send message to the target ids in the iterator + * + * @param vertexIdIterator The iterator of target vertex ids + * @param message The message sent to a worker + */ + public void sendMessageToAllRequest(Iterator vertexIdIterator, M message) { + while (vertexIdIterator.hasNext()) { + sendMessageRequest(vertexIdIterator.next(), message); + } + } + + /** + * Flush the rest of the messages to the workers. + */ + public void flush() { + PairList>> + remainingMessageCache = removeAllMessages(); + PairList>>.Iterator + iterator = remainingMessageCache.getIterator(); + while (iterator.hasNext()) { + iterator.next(); + WritableRequest writableRequest = + new SendWorkerMessagesRequest( + iterator.getCurrentSecond()); + totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); + clientProcessor.doRequest( + iterator.getCurrentFirst(), writableRequest); + } + } + + /** + * Reset the message count per superstep. + * + * @return The message count sent in last superstep + */ + public long resetMessageCount() { + long messagesSentInSuperstep = totalMsgsSentInSuperstep; + totalMsgsSentInSuperstep = 0; + return messagesSentInSuperstep; + } + + /** + * Reset the message bytes count per superstep. + * + * @return The message count sent in last superstep + */ + public long resetMessageBytesCount() { + long messageBytesSentInSuperstep = totalMsgBytesSentInSuperstep; + totalMsgBytesSentInSuperstep = 0; + return messageBytesSentInSuperstep; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index a50f673..39bf504 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -80,6 +80,8 @@ public class ServerData serviceWorker; /** * Constructor. @@ -95,6 +97,7 @@ public class ServerData> messageStoreFactory, Mapper.Context context) { + this.serviceWorker = service; this.conf = conf; this.messageStoreFactory = messageStoreFactory; if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) { @@ -188,4 +191,13 @@ public class ServerData getServiceWorker() { + return this.serviceWorker; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java index 89fb3e4..9bdf9ca 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.IOException; +import java.util.Iterator; /** * Aggregates IPC requests and sends them off @@ -41,9 +42,24 @@ public interface WorkerClientRequestProcessor vertex, Writable message); + + /** + * Sends a message to the targets in the iterator. + * + * @param vertexIdIterator The iterator of target vertex ids. + * @param message Message to send. + */ + void sendMessageToAllRequest(Iterator vertexIdIterator, Writable message); /** * Sends a vertex to the appropriate partition owner @@ -126,4 +142,11 @@ public interface WorkerClientRequestProcessor getMessagesAsIterable(M message) { return Collections.singleton(message); http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java index 7ce0083..34a3d1f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java @@ -21,6 +21,7 @@ import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.SendEdgeCache; import org.apache.giraph.comm.SendMessageCache; +import org.apache.giraph.comm.SendMessageToAllCache; import org.apache.giraph.comm.SendMutationsCache; import org.apache.giraph.comm.SendPartitionCache; import org.apache.giraph.comm.ServerData; @@ -31,7 +32,6 @@ import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest; import org.apache.giraph.comm.requests.SendPartitionMutationsRequest; import org.apache.giraph.comm.requests.SendVertexRequest; import org.apache.giraph.comm.requests.SendWorkerEdgesRequest; -import org.apache.giraph.comm.requests.SendWorkerMessagesRequest; import org.apache.giraph.comm.requests.WorkerRequest; import org.apache.giraph.comm.requests.WritableRequest; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -57,6 +57,7 @@ import com.yammer.metrics.core.Gauge; import com.yammer.metrics.util.PercentGauge; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE; @@ -90,8 +91,6 @@ public class NettyWorkerClientRequestProcessor(); /** NettyClient that could be shared among one or more instances */ private final WorkerClient workerClient; - /** Messages sent during the last superstep */ - private long totalMsgsSentInSuperstep = 0; /** Maximum size of messages per remote worker to cache before sending */ private final int maxMessagesSizePerWorker; /** Maximum size of edges per remote worker to cache before sending. */ @@ -126,9 +125,17 @@ public class NettyWorkerClientRequestProcessor(context, conf); - sendMessageCache = new SendMessageCache(conf, serviceWorker); sendEdgeCache = new SendEdgeCache(conf, serviceWorker); maxMessagesSizePerWorker = MAX_MSG_REQUEST_SIZE.get(conf); + if (this.configuration.isOneToAllMsgSendingEnabled()) { + sendMessageCache = + new SendMessageToAllCache(conf, serviceWorker, + this, maxMessagesSizePerWorker); + } else { + sendMessageCache = + new SendMessageCache(conf, serviceWorker, + this, maxMessagesSizePerWorker); + } maxEdgesSizePerWorker = MAX_EDGE_REQUEST_SIZE.get(conf); maxMutationsPerPartition = MAX_MUTATIONS_PER_REQUEST.get(conf); this.serviceWorker = serviceWorker; @@ -159,34 +166,20 @@ public class NettyWorkerClientRequestProcessor= maxMessagesSizePerWorker) { - PairList> - workerMessages = - sendMessageCache.removeWorkerMessages(workerInfo); - WritableRequest writableRequest = - new SendWorkerMessagesRequest(workerMessages); - doRequest(workerInfo, writableRequest); - return true; - } + @Override + public void sendMessageToAllRequest( + Vertex vertex, Writable message) { + this.sendMessageCache.sendMessageToAllRequest(vertex, message); + } - return false; + @Override + public void sendMessageToAllRequest( + Iterator vertexIdIterator, Writable message) { + this.sendMessageCache.sendMessageToAllRequest(vertexIdIterator, message); } @Override @@ -405,19 +398,8 @@ public class NettyWorkerClientRequestProcessor>> - remainingMessageCache = sendMessageCache.removeAllMessages(); - PairList>>.Iterator - iterator = remainingMessageCache.getIterator(); - while (iterator.hasNext()) { - iterator.next(); - WritableRequest writableRequest = - new SendWorkerMessagesRequest( - iterator.getCurrentSecond()); - doRequest(iterator.getCurrentFirst(), writableRequest); - } + // including one-to-one and one-to-all messages. + sendMessageCache.flush(); // Execute the remaining sends edges (if any) PairList vertex, M2 message) { - for (Edge edge : vertex.getEdges()) { - sendMessage(edge.getTargetVertexId(), message); - } + workerClientRequestProcessor.sendMessageToAllRequest(vertex, message); + } + + /** + * Send a message to multiple target vertex ids in the iterator. + * + * @param vertexIdIterator An iterator to multiple target vertex ids. + * @param message Message sent to all targets in the iterator. + */ + public void sendMessageToMultipleEdges( + Iterator vertexIdIterator, M2 message) { + workerClientRequestProcessor.sendMessageToAllRequest( + vertexIdIterator, message); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index a9bf3fd..77d9f5e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -96,6 +96,8 @@ public class ComputeCallable 0) { + long partitionMsgBytes = + workerClientRequestProcessor.resetMessageBytesCount(); + partitionStatsList.get(partitionStatsList.size() - 1). + addMessageBytesSentCount(partitionMsgBytes); + messageBytesSentCounter.inc(partitionMsgBytes); + } aggregatorUsage.finishThreadComputation(); } catch (IOException e) { throw new IllegalStateException("call: Flushing failed.", e); @@ -211,7 +228,7 @@ public class ComputeCallable computation, Partition partition) throws IOException, InterruptedException { PartitionStats partitionStats = - new PartitionStats(partition.getId(), 0, 0, 0, 0); + new PartitionStats(partition.getId(), 0, 0, 0, 0, 0); // Make sure this is thread-safe across runs synchronized (partition) { for (Vertex vertex : partition) { http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java index f3cbea2..bc56c9c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java @@ -37,6 +37,8 @@ public class GlobalStats implements Writable { private long edgeCount = 0; /** All messages sent in the last superstep */ private long messageCount = 0; + /** All message bytes sent in the last superstep */ + private long messageBytesCount = 0; /** Whether the computation should be halted */ private boolean haltComputation = false; @@ -67,6 +69,10 @@ public class GlobalStats implements Writable { return messageCount; } + public long getMessageBytesCount() { + return messageBytesCount; + } + public boolean getHaltComputation() { return haltComputation; } @@ -84,12 +90,22 @@ public class GlobalStats implements Writable { this.messageCount += messageCount; } + /** + * Add messages to the global stats. + * + * @param msgBytesCount Number of message bytes to be added. + */ + public void addMessageBytesCount(long msgBytesCount) { + this.messageBytesCount += msgBytesCount; + } + @Override public void readFields(DataInput input) throws IOException { vertexCount = input.readLong(); finishedVertexCount = input.readLong(); edgeCount = input.readLong(); messageCount = input.readLong(); + messageBytesCount = input.readLong(); haltComputation = input.readBoolean(); } @@ -99,6 +115,7 @@ public class GlobalStats implements Writable { output.writeLong(finishedVertexCount); output.writeLong(edgeCount); output.writeLong(messageCount); + output.writeLong(messageBytesCount); output.writeBoolean(haltComputation); } @@ -106,6 +123,7 @@ public class GlobalStats implements Writable { public String toString() { return "(vtx=" + vertexCount + ",finVtx=" + finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" + - messageCount + ",haltComputation=" + haltComputation + ")"; + messageCount + ",msgBytesCount=" + + messageBytesCount + ",haltComputation=" + haltComputation + ")"; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 1d3cff0..d08495b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -954,6 +954,9 @@ public class BspServiceMaster } /** + * Add a serialized vertex id and data. + * + * @param serializedId The bye array which holds the serialized id. + * @param idPos The end position of the serialized id in the byte array. + * @param data Data + */ + public void add(byte[] serializedId, int idPos, T data) { + try { + extendedDataOutput.write(serializedId, 0, idPos); + writeData(extendedDataOutput, data); + } catch (IOException e) { + throw new IllegalStateException("add: IOException", e); + } + } + + /** * Get the number of bytes used. * * @return Bytes used http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 52bac3f..da1e7fb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -573,7 +573,7 @@ public class BspServiceWorker partitionStatsList, long workerSentMessages) { + List partitionStatsList, long workerSentMessages, + long workerSentMessageBytes) { Collection finalizedPartitionStats = workerGraphPartitioner.finalizePartitionStats( partitionStatsList, getPartitionStore()); @@ -873,6 +880,8 @@ public class BspServiceWorker + dataToSend = new ByteArrayOneToAllMessages< + IntWritable, IntWritable>(new TestMessageValueFactory(IntWritable.class)); + dataToSend.setConf(conf); + dataToSend.initialize(); + ExtendedDataOutput output = conf.createExtendedDataOutput(); + for (int i = 1; i <= 7; ++i) { + IntWritable vertexId = new IntWritable(i); + vertexId.write(output); + } + dataToSend.add(output.getByteArray(), output.getPos(), 7, new IntWritable(1)); + + // Send the request + SendWorkerOneToAllMessagesRequest request = + new SendWorkerOneToAllMessagesRequest(dataToSend, conf); + client.sendWritableRequest(workerInfo.getTaskId(), request); + client.waitAllRequests(); + + // Stop the service + client.stop(); + server.stop(); + + // Check the output + Iterable vertices = + serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); + int keySum = 0; + int messageSum = 0; + for (IntWritable vertexId : vertices) { + keySum += vertexId.get(); + Iterable messages = + serverData.getIncomingMessageStore().getVertexMessages( + vertexId); + synchronized (messages) { + for (IntWritable message : messages) { + messageSum += message.get(); + } + } + } + assertEquals(28, keySum); + assertEquals(7, messageSum); + } + + @Test public void sendPartitionMutationsRequest() throws IOException { // Data to send int partitionId = 19; http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java index bc5b5e2..97e88f8 100644 --- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java +++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java @@ -30,6 +30,10 @@ import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.Vertex; import org.apache.giraph.partition.BasicPartitionOwner; import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.partition.PartitionStore; +import org.apache.giraph.partition.SimplePartition; +import org.apache.giraph.partition.SimplePartitionStore; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -94,6 +98,10 @@ public class MockUtils { Mockito.verify(workerClientRequestProcessor).sendMessageRequest (targetVertexId, message); } + + public void verifyMessageSentToAllEdges(Vertex vertex, M message) { + Mockito.verify(workerClientRequestProcessor).sendMessageToAllRequest(vertex, message); + } /** assert that the test vertex has sent no message to a particular vertex */ public void verifyNoMessageSent() { @@ -149,6 +157,12 @@ public class MockUtils { return env; } + /** + * Prepare a mocked CentralizedServiceWorker. + * + * @param numOfPartitions The number of partitions + * @return CentralizedServiceWorker + */ public static CentralizedServiceWorker mockServiceGetVertexPartitionOwner(final int numOfPartitions) { @@ -167,14 +181,24 @@ public class MockUtils { return service; } + /** + * Prepare a ServerData object. + * + * @param conf Configuration + * @param context Context + * @return ServerData + */ public static ServerData - createNewServerData(ImmutableClassesGiraphConfiguration conf, - Mapper.Context context) { - return new ServerData( - Mockito.mock(CentralizedServiceWorker.class), - conf, - ByteArrayMessagesPerVertexStore.newFactory( - MockUtils.mockServiceGetVertexPartitionOwner(1), conf), - context); + createNewServerData( + ImmutableClassesGiraphConfiguration conf, Mapper.Context context) { + CentralizedServiceWorker serviceWorker = + MockUtils.mockServiceGetVertexPartitionOwner(1); + ServerData serverData = + new ServerData( + serviceWorker, conf, ByteArrayMessagesPerVertexStore.newFactory( + serviceWorker, conf), context); + // Here we add a partition to simulate the case that there is one partition. + serverData.getPartitionStore().addPartition(new SimplePartition()); + return serverData; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d31949a1/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java index 7346745..b42d504 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingComputationTest.java @@ -59,10 +59,8 @@ public class SimpleTriangleClosingComputationTest { computation.compute(vertex, Lists.newArrayList( new IntWritable(83), new IntWritable(42))); - env.verifyMessageSent(new IntWritable(5), new IntWritable(5)); - env.verifyMessageSent(new IntWritable(5), new IntWritable(7)); - env.verifyMessageSent(new IntWritable(7), new IntWritable(5)); - env.verifyMessageSent(new IntWritable(7), new IntWritable(7)); + env.verifyMessageSentToAllEdges(vertex, new IntWritable(5)); + env.verifyMessageSentToAllEdges(vertex, new IntWritable(7)); } /** Test behavior of compute() with incoming messages (superstep 1) */