Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6AA3417502 for ; Sun, 26 Oct 2014 01:21:46 +0000 (UTC) Received: (qmail 80766 invoked by uid 500); 26 Oct 2014 01:21:45 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 80642 invoked by uid 500); 26 Oct 2014 01:21:45 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 80132 invoked by uid 99); 26 Oct 2014 01:21:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 26 Oct 2014 01:21:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 29E968A0252; Sun, 26 Oct 2014 01:21:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rvs@apache.org To: commits@giraph.apache.org Date: Sun, 26 Oct 2014 01:21:56 -0000 Message-Id: <4320acaa914048e489e867a1fff4b251@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/47] git commit: updated refs/heads/release-1.1 to 4c139ee GIRAPH-912: Support succinct representation of messages in messagestores (pavanka) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f31e9a32 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f31e9a32 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f31e9a32 Branch: refs/heads/release-1.1 Commit: f31e9a328d3b4f10906a10a8e69d2ae515e3aba0 Parents: 61cb37e Author: Pavan Kumar Authored: Wed Jul 9 17:08:48 2014 -0700 Committer: Pavan Kumar Committed: Wed Jul 9 17:08:48 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/comm/SendMessageToAllCache.java | 308 ------------------- .../giraph/comm/SendOneMessageToManyCache.java | 306 ++++++++++++++++++ .../java/org/apache/giraph/comm/ServerData.java | 2 + .../messages/AbstractListPerVertexStore.java | 103 +++++++ .../ByteArrayMessagesPerVertexStore.java | 17 +- .../messages/InMemoryMessageStoreFactory.java | 114 +++++-- .../messages/MessageEncodeAndStoreType.java | 59 ++++ .../giraph/comm/messages/MessageStore.java | 15 + .../comm/messages/OneMessagePerVertexStore.java | 5 + .../messages/PointerListMessagesIterable.java | 105 +++++++ .../messages/PointerListPerVertexStore.java | 137 +++++++++ .../comm/messages/SimpleMessageStore.java | 4 + .../out_of_core/DiskBackedMessageStore.java | 9 + .../primitives/IntByteArrayMessageStore.java | 9 + .../primitives/IntFloatMessageStore.java | 9 + .../primitives/LongByteArrayMessageStore.java | 241 --------------- .../primitives/LongDoubleMessageStore.java | 9 + .../long_id/LongAbstractListMessageStore.java | 164 ++++++++++ .../long_id/LongAbstractMessageStore.java | 132 ++++++++ .../long_id/LongByteArrayMessageStore.java | 172 +++++++++++ .../long_id/LongPointerListMessageStore.java | 129 ++++++++ .../primitives/long_id/package-info.java | 22 ++ .../NettyWorkerClientRequestProcessor.java | 8 +- .../giraph/comm/netty/NettyWorkerServer.java | 2 +- .../giraph/comm/requests/RequestType.java | 6 +- .../SendWorkerOneMessageToManyRequest.java | 156 ++++++++++ .../SendWorkerOneToAllMessagesRequest.java | 155 ---------- .../apache/giraph/conf/GiraphConfiguration.java | 17 +- .../org/apache/giraph/conf/GiraphConstants.java | 20 +- .../utils/ByteArrayOneMessageToManyIds.java | 105 +++++++ .../giraph/utils/ByteArrayOneToAllMessages.java | 168 ---------- .../utils/ByteStructVertexIdDataIterator.java | 9 + .../ByteStructVertexIdMessageIterator.java | 10 + .../utils/ExtendedByteArrayOutputBuffer.java | 155 ++++++++++ .../apache/giraph/utils/ExtendedDataOutput.java | 1 - .../utils/OneMessageToManyIdsIterator.java | 143 +++++++++ .../apache/giraph/utils/UnsafeArrayReads.java | 2 +- .../org/apache/giraph/utils/UnsafeReads.java | 2 +- .../utils/UnsafeReusableByteArrayInput.java | 46 +++ .../giraph/utils/VertexIdDataIterator.java | 7 + .../giraph/utils/VertexIdMessageIterator.java | 14 + .../org/apache/giraph/comm/RequestTest.java | 14 +- .../TestLongDoublePrimitiveMessageStores.java | 2 +- 44 files changed, 2160 insertions(+), 955 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 13dfcd7..0263749 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-912: Support succinct representation of messages in messagestores (pavanka) + GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka) GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java deleted file mode 100644 index 60858ea..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; -import org.apache.giraph.comm.requests.SendWorkerMessagesRequest; -import org.apache.giraph.comm.requests.SendWorkerOneToAllMessagesRequest; -import org.apache.giraph.comm.requests.WritableRequest; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.utils.ByteArrayOneToAllMessages; -import org.apache.giraph.utils.VertexIdMessages; -import org.apache.giraph.utils.ExtendedDataOutput; -import org.apache.giraph.utils.PairList; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -/** - * Aggregates the messages to be sent to workers so they can be sent - * in bulk. Not thread-safe. - * - * @param Vertex id - * @param Message data - */ -public class SendMessageToAllCache extends SendMessageCache { - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(SendMessageToAllCache.class); - /** Cache serialized messages for each worker */ - private final ByteArrayOneToAllMessages[] oneToAllMsgCache; - /** Tracking one-to-all message sizes for each worker */ - private final int[] oneToAllMsgSizes; - /** Reused byte array to serialize target ids on each worker */ - private final ExtendedDataOutput[] idSerializer; - /** Reused int array to count target id distribution */ - private final int[] idCounter; - /** - * Reused int array to record the partition id - * of the first target vertex id found on the worker. - */ - private final int[] firstPartitionMap; - /** The WorkerInfo list */ - private final WorkerInfo[] workerInfoList; - - /** - * Constructor - * - * @param conf Giraph configuration - * @param serviceWorker Service worker - * @param processor NettyWorkerClientRequestProcessor - * @param maxMsgSize Max message size sent to a worker - */ - public SendMessageToAllCache(ImmutableClassesGiraphConfiguration conf, - CentralizedServiceWorker serviceWorker, - NettyWorkerClientRequestProcessor processor, - int maxMsgSize) { - super(conf, serviceWorker, processor, maxMsgSize); - int numWorkers = getNumWorkers(); - oneToAllMsgCache = new ByteArrayOneToAllMessages[numWorkers]; - oneToAllMsgSizes = new int[numWorkers]; - idSerializer = new ExtendedDataOutput[numWorkers]; - // InitialBufferSizes is alo initialized based on the number of workers. - // As a result, initialBufferSizes is the same as idSerializer in length - int initialBufferSize = 0; - for (int i = 0; i < this.idSerializer.length; i++) { - initialBufferSize = getSendWorkerInitialBufferSize(i); - if (initialBufferSize > 0) { - // InitialBufferSizes is from super class. - // Each element is for one worker. - idSerializer[i] = conf.createExtendedDataOutput(initialBufferSize); - } - } - idCounter = new int[numWorkers]; - firstPartitionMap = new int[numWorkers]; - // Get worker info list. - workerInfoList = new WorkerInfo[numWorkers]; - // Remember there could be null in the array. - for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) { - workerInfoList[workerInfo.getTaskId()] = workerInfo; - } - } - - /** - * Reset ExtendedDataOutput array for id serialization - * in next "one-to-all" message sending. - */ - private void resetIdSerializers() { - for (int i = 0; i < this.idSerializer.length; i++) { - if (idSerializer[i] != null) { - idSerializer[i].reset(); - } - } - } - - /** - * Reset id counter for next "one-to-all" message sending. - */ - private void resetIdCounter() { - Arrays.fill(idCounter, 0); - } - - /** - * Add message with multiple ids to - * one-to-all message cache. - * - * @param workerInfo The remote worker destination - * @param ids A byte array to hold serialized vertex ids - * @param idPos The end position of ids - * information in the byte array above - * @param count The number of target ids - * @param message Message to send to remote worker - * @return The size of messages for the worker. - */ - private int addOneToAllMessage( - WorkerInfo workerInfo, byte[] ids, int idPos, int count, M message) { - // Get the data collection - ByteArrayOneToAllMessages workerData = - oneToAllMsgCache[workerInfo.getTaskId()]; - if (workerData == null) { - workerData = new ByteArrayOneToAllMessages( - getConf().getOutgoingMessageValueFactory()); - workerData.setConf(getConf()); - workerData.initialize(getSendWorkerInitialBufferSize( - workerInfo.getTaskId())); - oneToAllMsgCache[workerInfo.getTaskId()] = workerData; - } - workerData.add(ids, idPos, count, message); - // Update the size of cached, outgoing data per worker - oneToAllMsgSizes[workerInfo.getTaskId()] = - workerData.getSize(); - return oneToAllMsgSizes[workerInfo.getTaskId()]; - } - - /** - * Gets the one-to-all - * messages for a worker and removes it from the cache. - * Here the ByteArrayOneToAllMessages returned could be null. - * But when invoking this method, we also check if the data size sent - * to this worker is above the threshold. Therefore, it doesn't matter - * if the result is null or not. - * - * @param workerInfo The target worker where one-to-all messages - * go to. - * @return ByteArrayOneToAllMessages that belong to the workerInfo - */ - private ByteArrayOneToAllMessages - removeWorkerOneToAllMessages(WorkerInfo workerInfo) { - ByteArrayOneToAllMessages workerData = - oneToAllMsgCache[workerInfo.getTaskId()]; - if (workerData != null) { - oneToAllMsgCache[workerInfo.getTaskId()] = null; - oneToAllMsgSizes[workerInfo.getTaskId()] = 0; - } - return workerData; - } - - /** - * Gets all the one-to-all - * messages and removes them from the cache. - * - * @return All vertex messages for all workers - */ - private PairList> - removeAllOneToAllMessages() { - PairList> allData = - new PairList>(); - allData.initialize(oneToAllMsgCache.length); - for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) { - ByteArrayOneToAllMessages workerData = - removeWorkerOneToAllMessages(workerInfo); - if (workerData != null && !workerData.isEmpty()) { - allData.add(workerInfo, workerData); - } - } - return allData; - } - - @Override - public void sendMessageToAllRequest(Iterator vertexIdIterator, M message) { - // This is going to be reused through every message sending - resetIdSerializers(); - resetIdCounter(); - // Count messages - int currentMachineId = 0; - PartitionOwner owner = null; - WorkerInfo workerInfo = null; - I vertexId = null; - while (vertexIdIterator.hasNext()) { - vertexId = vertexIdIterator.next(); - owner = getServiceWorker().getVertexPartitionOwner(vertexId); - workerInfo = owner.getWorkerInfo(); - currentMachineId = workerInfo.getTaskId(); - // Serialize this target vertex id - try { - vertexId.write(idSerializer[currentMachineId]); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to serialize the target vertex id."); - } - idCounter[currentMachineId]++; - // Record the first partition id in the worker which message send to. - // If idCounter shows there is only one target on this worker - // then this is the partition number of the target vertex. - if (idCounter[currentMachineId] == 1) { - firstPartitionMap[currentMachineId] = owner.getPartitionId(); - } - } - // Add the message to the cache - int idSerializerPos = 0; - int workerMessageSize = 0; - byte[] serializedId = null; - WritableRequest writableRequest = null; - for (int i = 0; i < idCounter.length; i++) { - if (idCounter[i] == 1) { - serializedId = idSerializer[i].getByteArray(); - idSerializerPos = idSerializer[i].getPos(); - // Add the message to the cache - workerMessageSize = addMessage(workerInfoList[i], - firstPartitionMap[i], serializedId, idSerializerPos, message); - - if (LOG.isTraceEnabled()) { - LOG.trace("sendMessageToAllRequest: Send bytes (" + - message.toString() + ") to one target in worker " + - workerInfoList[i]); - } - ++totalMsgsSentInSuperstep; - if (workerMessageSize >= maxMessagesSizePerWorker) { - PairList> - workerMessages = removeWorkerMessages(workerInfoList[i]); - writableRequest = - new SendWorkerMessagesRequest(workerMessages); - totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); - clientProcessor.doRequest(workerInfoList[i], writableRequest); - // Notify sending - getServiceWorker().getGraphTaskManager().notifySentMessages(); - } - } else if (idCounter[i] > 1) { - serializedId = idSerializer[i].getByteArray(); - idSerializerPos = idSerializer[i].getPos(); - workerMessageSize = addOneToAllMessage( - workerInfoList[i], serializedId, idSerializerPos, idCounter[i], - message); - - if (LOG.isTraceEnabled()) { - LOG.trace("sendMessageToAllRequest: Send bytes (" + - message.toString() + ") to all targets in worker" + - workerInfoList[i]); - } - totalMsgsSentInSuperstep += idCounter[i]; - if (workerMessageSize >= maxMessagesSizePerWorker) { - ByteArrayOneToAllMessages workerOneToAllMessages = - removeWorkerOneToAllMessages(workerInfoList[i]); - writableRequest = - new SendWorkerOneToAllMessagesRequest( - workerOneToAllMessages, getConf()); - totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); - clientProcessor.doRequest(workerInfoList[i], writableRequest); - // Notify sending - getServiceWorker().getGraphTaskManager().notifySentMessages(); - } - } - } - } - - @Override - public void flush() { - super.flush(); - PairList> - remainingOneToAllMessageCache = - removeAllOneToAllMessages(); - PairList>.Iterator - oneToAllMsgIterator = remainingOneToAllMessageCache.getIterator(); - while (oneToAllMsgIterator.hasNext()) { - oneToAllMsgIterator.next(); - WritableRequest writableRequest = - new SendWorkerOneToAllMessagesRequest( - oneToAllMsgIterator.getCurrentSecond(), getConf()); - totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); - clientProcessor.doRequest( - oneToAllMsgIterator.getCurrentFirst(), writableRequest); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java new file mode 100644 index 0000000..c67a20b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; +import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest; +import org.apache.giraph.comm.requests.SendWorkerMessagesRequest; +import org.apache.giraph.comm.requests.WritableRequest; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.utils.ByteArrayOneMessageToManyIds; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.PairList; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Aggregates the messages to be sent to workers so they can be sent + * in bulk. + * + * @param Vertex id + * @param Message data + */ +@NotThreadSafe +@SuppressWarnings("unchecked") +public class SendOneMessageToManyCache extends SendMessageCache { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(SendOneMessageToManyCache.class); + /** Cache serialized one to many messages for each worker */ + private final ByteArrayOneMessageToManyIds[] msgVidsCache; + /** Tracking message-vertexIds sizes for each worker */ + private final int[] msgVidsSizes; + /** Reused byte array to serialize target ids on each worker */ + private final ExtendedDataOutput[] idSerializer; + /** Reused int array to count target id distribution */ + private final int[] idCounter; + /** + * Reused int array to record the partition id + * of the first target vertex id found on the worker. + */ + private final int[] firstPartitionMap; + /** The WorkerInfo list */ + private final WorkerInfo[] workerInfoList; + + /** + * Constructor + * + * @param conf Giraph configuration + * @param serviceWorker Service worker + * @param processor NettyWorkerClientRequestProcessor + * @param maxMsgSize Max message size sent to a worker + */ + public SendOneMessageToManyCache(ImmutableClassesGiraphConfiguration conf, + CentralizedServiceWorker serviceWorker, + NettyWorkerClientRequestProcessor processor, + int maxMsgSize) { + super(conf, serviceWorker, processor, maxMsgSize); + int numWorkers = getNumWorkers(); + msgVidsCache = new ByteArrayOneMessageToManyIds[numWorkers]; + msgVidsSizes = new int[numWorkers]; + idSerializer = new ExtendedDataOutput[numWorkers]; + // InitialBufferSizes is alo initialized based on the number of workers. + // As a result, initialBufferSizes is the same as idSerializer in length + int initialBufferSize = 0; + for (int i = 0; i < this.idSerializer.length; i++) { + initialBufferSize = getSendWorkerInitialBufferSize(i); + if (initialBufferSize > 0) { + // InitialBufferSizes is from super class. + // Each element is for one worker. + idSerializer[i] = conf.createExtendedDataOutput(initialBufferSize); + } + } + idCounter = new int[numWorkers]; + firstPartitionMap = new int[numWorkers]; + // Get worker info list. + workerInfoList = new WorkerInfo[numWorkers]; + // Remember there could be null in the array. + for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) { + workerInfoList[workerInfo.getTaskId()] = workerInfo; + } + } + + /** + * Reset ExtendedDataOutput array for id serialization + * in next message-Vids encoding + */ + private void resetIdSerializers() { + for (int i = 0; i < this.idSerializer.length; i++) { + if (idSerializer[i] != null) { + idSerializer[i].reset(); + } + } + } + + /** + * Reset id counter for next message-vertexIds encoding + */ + private void resetIdCounter() { + Arrays.fill(idCounter, 0); + } + + /** + * Add message with multiple target ids to message cache. + * + * @param workerInfo The remote worker destination + * @param ids A byte array to hold serialized vertex ids + * @param idPos The end position of ids + * information in the byte array above + * @param count The number of target ids + * @param message Message to send to remote worker + * @return The size of messages for the worker. + */ + private int addOneToManyMessage( + WorkerInfo workerInfo, byte[] ids, int idPos, int count, M message) { + // Get the data collection + ByteArrayOneMessageToManyIds workerData = + msgVidsCache[workerInfo.getTaskId()]; + if (workerData == null) { + workerData = new ByteArrayOneMessageToManyIds( + getConf().getOutgoingMessageValueFactory()); + workerData.setConf(getConf()); + workerData.initialize(getSendWorkerInitialBufferSize( + workerInfo.getTaskId())); + msgVidsCache[workerInfo.getTaskId()] = workerData; + } + workerData.add(ids, idPos, count, message); + // Update the size of cached, outgoing data per worker + msgVidsSizes[workerInfo.getTaskId()] = + workerData.getSize(); + return msgVidsSizes[workerInfo.getTaskId()]; + } + + /** + * Gets the messages + vertexIds for a worker and removes it from the cache. + * Here the {@link org.apache.giraph.utils.ByteArrayOneMessageToManyIds} + * returned could be null.But when invoking this method, we also check if + * the data size sent to this worker is above the threshold. + * Therefore, it doesn't matter if the result is null or not. + * + * @param workerInfo Target worker to which one messages - many ids are sent + * @return {@link org.apache.giraph.utils.ByteArrayOneMessageToManyIds} + * that belong to the workerInfo + */ + private ByteArrayOneMessageToManyIds + removeWorkerMsgVids(WorkerInfo workerInfo) { + ByteArrayOneMessageToManyIds workerData = + msgVidsCache[workerInfo.getTaskId()]; + if (workerData != null) { + msgVidsCache[workerInfo.getTaskId()] = null; + msgVidsSizes[workerInfo.getTaskId()] = 0; + } + return workerData; + } + + /** + * Gets all messages - vertexIds and removes them from the cache. + * + * @return All vertex messages for all workers + */ + private PairList> + removeAllMsgVids() { + PairList> allData = + new PairList>(); + allData.initialize(msgVidsCache.length); + for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) { + ByteArrayOneMessageToManyIds workerData = + removeWorkerMsgVids(workerInfo); + if (workerData != null && !workerData.isEmpty()) { + allData.add(workerInfo, workerData); + } + } + return allData; + } + + @Override + public void sendMessageToAllRequest(Iterator vertexIdIterator, M message) { + // This is going to be reused through every message sending + resetIdSerializers(); + resetIdCounter(); + // Count messages + int currentMachineId = 0; + PartitionOwner owner = null; + WorkerInfo workerInfo = null; + I vertexId = null; + while (vertexIdIterator.hasNext()) { + vertexId = vertexIdIterator.next(); + owner = getServiceWorker().getVertexPartitionOwner(vertexId); + workerInfo = owner.getWorkerInfo(); + currentMachineId = workerInfo.getTaskId(); + // Serialize this target vertex id + try { + vertexId.write(idSerializer[currentMachineId]); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to serialize the target vertex id."); + } + idCounter[currentMachineId]++; + // Record the first partition id in the worker which message send to. + // If idCounter shows there is only one target on this worker + // then this is the partition number of the target vertex. + if (idCounter[currentMachineId] == 1) { + firstPartitionMap[currentMachineId] = owner.getPartitionId(); + } + } + // Add the message to the cache + int idSerializerPos = 0; + int workerMessageSize = 0; + byte[] serializedId = null; + WritableRequest writableRequest = null; + for (int i = 0; i < idCounter.length; i++) { + if (idCounter[i] == 1) { + serializedId = idSerializer[i].getByteArray(); + idSerializerPos = idSerializer[i].getPos(); + // Add the message to the cache + workerMessageSize = addMessage(workerInfoList[i], + firstPartitionMap[i], serializedId, idSerializerPos, message); + + if (LOG.isTraceEnabled()) { + LOG.trace("sendMessageToAllRequest: Send bytes (" + + message.toString() + ") to one target in worker " + + workerInfoList[i]); + } + ++totalMsgsSentInSuperstep; + if (workerMessageSize >= maxMessagesSizePerWorker) { + PairList> + workerMessages = removeWorkerMessages(workerInfoList[i]); + writableRequest = new SendWorkerMessagesRequest<>(workerMessages); + totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); + clientProcessor.doRequest(workerInfoList[i], writableRequest); + // Notify sending + getServiceWorker().getGraphTaskManager().notifySentMessages(); + } + } else if (idCounter[i] > 1) { + serializedId = idSerializer[i].getByteArray(); + idSerializerPos = idSerializer[i].getPos(); + workerMessageSize = addOneToManyMessage( + workerInfoList[i], serializedId, idSerializerPos, idCounter[i], + message); + + if (LOG.isTraceEnabled()) { + LOG.trace("sendMessageToAllRequest: Send bytes (" + + message.toString() + ") to all targets in worker" + + workerInfoList[i]); + } + totalMsgsSentInSuperstep += idCounter[i]; + if (workerMessageSize >= maxMessagesSizePerWorker) { + ByteArrayOneMessageToManyIds workerMsgVids = + removeWorkerMsgVids(workerInfoList[i]); + writableRequest = new SendWorkerOneMessageToManyRequest<>( + workerMsgVids, getConf()); + totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); + clientProcessor.doRequest(workerInfoList[i], writableRequest); + // Notify sending + getServiceWorker().getGraphTaskManager().notifySentMessages(); + } + } + } + } + + @Override + public void flush() { + super.flush(); + PairList> + remainingMsgVidsCache = removeAllMsgVids(); + PairList>.Iterator + msgIdsIterator = remainingMsgVidsCache.getIterator(); + while (msgIdsIterator.hasNext()) { + msgIdsIterator.next(); + WritableRequest writableRequest = + new SendWorkerOneMessageToManyRequest<>( + msgIdsIterator.getCurrentSecond(), getConf()); + totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); + clientProcessor.doRequest( + msgIdsIterator.getCurrentFirst(), writableRequest); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index b3f8733..85bfe04 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -176,6 +176,8 @@ public class ServerData Vertex id + * @param Message data + * @param List type + */ +public abstract class AbstractListPerVertexStore extends SimpleMessageStore { + + /** + * Constructor + * + * @param messageValueFactory Message class held in the store + * @param service Service worker + * @param config Hadoop configuration + */ + public AbstractListPerVertexStore( + MessageValueFactory messageValueFactory, + CentralizedServiceWorker service, + ImmutableClassesGiraphConfiguration config) { + super(messageValueFactory, service, config); + } + + /** + * Create an instance of L + * @return instance of L + */ + protected abstract L createList(); + + /** + * Get the list of pointers for a vertex + * Each pointer has information of how to access an encoded message + * for this vertex + * + * @param iterator vertex id iterator + * @return pointer list + */ + protected L getOrCreateList(VertexIdIterator iterator) { + PartitionOwner owner = + service.getVertexPartitionOwner(iterator.getCurrentVertexId()); + int partitionId = owner.getPartitionId(); + ConcurrentMap partitionMap = getOrCreatePartitionMap(partitionId); + L list = partitionMap.get(iterator.getCurrentVertexId()); + if (list == null) { + L newList = createList(); + list = partitionMap.putIfAbsent( + iterator.releaseCurrentVertexId(), newList); + if (list == null) { + list = newList; + } + } + return list; + } + + @Override + public Iterable getVertexMessages(I vertexId) throws IOException { + ConcurrentMap partitionMap = + map.get(getPartitionId(vertexId)); + if (partitionMap == null) { + return Collections.emptyList(); + } + L list = partitionMap.get(vertexId); + return list == null ? Collections.emptyList() : + getMessagesAsIterable(list); + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java index 65939bb..57d255f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java @@ -62,6 +62,11 @@ public class ByteArrayMessagesPerVertexStore messages) throws IOException { + int partitionId, VertexIdMessages messages) throws IOException { ConcurrentMap partitionMap = getOrCreatePartitionMap(partitionId); - VertexIdMessageBytesIterator - vertexIdMessageBytesIterator = + VertexIdMessageBytesIterator vertexIdMessageBytesIterator = messages.getVertexIdMessageBytesIterator(); // Try to copy the message buffer over rather than // doing a deserialization of a message just to know its size. This @@ -113,8 +116,8 @@ public class ByteArrayMessagesPerVertexStore - vertexIdMessageIterator = messages.getVertexIdMessageIterator(); + VertexIdMessageIterator vertexIdMessageIterator = + messages.getVertexIdMessageIterator(); while (vertexIdMessageIterator.hasNext()) { vertexIdMessageIterator.next(); DataInputOutput dataInputOutput = @@ -188,7 +191,7 @@ public class ByteArrayMessagesPerVertexStore Message data */ private static class Factory - implements MessageStoreFactory> { + implements MessageStoreFactory> { /** Service worker */ private CentralizedServiceWorker service; /** Hadoop configuration */ http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java index f691d3e..db22503 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java @@ -22,8 +22,10 @@ import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore; -import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore; +import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore; +import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.hadoop.io.DoubleWritable; @@ -43,6 +45,7 @@ import org.apache.log4j.Logger; * @param Vertex id * @param Message data */ +@SuppressWarnings("unchecked") public class InMemoryMessageStoreFactory implements MessageStoreFactory> { @@ -51,9 +54,9 @@ public class InMemoryMessageStoreFactory service; + protected CentralizedServiceWorker service; /** Hadoop configuration */ - private ImmutableClassesGiraphConfiguration conf; + protected ImmutableClassesGiraphConfiguration conf; /** * Default constructor allowing class invocation via Reflection. @@ -61,46 +64,89 @@ public class InMemoryMessageStoreFactory newStore( - MessageValueFactory messageValueFactory) { + /** + * MessageStore to be used when combiner is enabled + * + * @param messageValueFactory message value factory + * @return message store + */ + protected MessageStore newStoreWithCombiner( + MessageValueFactory messageValueFactory) { Class messageClass = messageValueFactory.getValueClass(); MessageStore messageStore; - if (conf.useMessageCombiner()) { - Class vertexIdClass = conf.getVertexIdClass(); - if (vertexIdClass.equals(IntWritable.class) && - messageClass.equals(FloatWritable.class)) { - messageStore = new IntFloatMessageStore( - (CentralizedServiceWorker) service, - (MessageCombiner) - conf.createMessageCombiner()); - } else if (vertexIdClass.equals(LongWritable.class) && - messageClass.equals(DoubleWritable.class)) { - messageStore = new LongDoubleMessageStore( + Class vertexIdClass = conf.getVertexIdClass(); + if (vertexIdClass.equals(IntWritable.class) && + messageClass.equals(FloatWritable.class)) { + messageStore = new IntFloatMessageStore( + (CentralizedServiceWorker) service, + (MessageCombiner) + conf.createMessageCombiner()); + } else if (vertexIdClass.equals(LongWritable.class) && + messageClass.equals(DoubleWritable.class)) { + messageStore = new LongDoubleMessageStore( (CentralizedServiceWorker) service, (MessageCombiner) conf.createMessageCombiner()); - } else { - messageStore = new OneMessagePerVertexStore(messageValueFactory, + } else { + messageStore = new OneMessagePerVertexStore(messageValueFactory, service, conf.createMessageCombiner(), conf); + } + return messageStore; + } + + /** + * MessageStore to be used when combiner is not enabled + * + * @param messageValueFactory message value factory + * @return message store + */ + protected MessageStore newStoreWithoutCombiner( + MessageValueFactory messageValueFactory) { + MessageStore messageStore = null; + MessageEncodeAndStoreType encodeAndStore = GiraphConstants + .MESSAGE_ENCODE_AND_STORE_TYPE.get(conf); + Class vertexIdClass = conf.getVertexIdClass(); + if (vertexIdClass.equals(IntWritable.class)) { // INT + messageStore = new IntByteArrayMessageStore(messageValueFactory, + service, conf); + } else if (vertexIdClass.equals(LongWritable.class)) { // LONG + if (encodeAndStore.equals( + MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) || + encodeAndStore.equals( + MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) { + messageStore = new LongByteArrayMessageStore(messageValueFactory, + service, conf); + } else if (encodeAndStore.equals( + MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) { + messageStore = new LongPointerListMessageStore(messageValueFactory, + service, conf); } - } else { - Class vertexIdClass = conf.getVertexIdClass(); - if (vertexIdClass.equals(IntWritable.class)) { - messageStore = new IntByteArrayMessageStore(messageValueFactory, - (CentralizedServiceWorker) service, - (ImmutableClassesGiraphConfiguration) - conf); - } else if (vertexIdClass.equals(LongWritable.class)) { - messageStore = new LongByteArrayMessageStore(messageValueFactory, - (CentralizedServiceWorker) service, - (ImmutableClassesGiraphConfiguration) conf); - } else { - messageStore = new ByteArrayMessagesPerVertexStore( - messageValueFactory, service, conf); + } else { // GENERAL + if (encodeAndStore.equals( + MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) || + encodeAndStore.equals( + MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) { + messageStore = new ByteArrayMessagesPerVertexStore<>( + messageValueFactory, service, conf); + } else if (encodeAndStore.equals( + MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) { + messageStore = new PointerListPerVertexStore(messageValueFactory, + service, conf); } } + return messageStore; + } + + @Override + public MessageStore newStore( + MessageValueFactory messageValueFactory) { + Class messageClass = messageValueFactory.getValueClass(); + MessageStore messageStore; + if (conf.useMessageCombiner()) { + messageStore = newStoreWithCombiner(messageValueFactory); + } else { + messageStore = newStoreWithoutCombiner(messageValueFactory); + } if (LOG.isInfoEnabled()) { LOG.info("newStore: Created " + messageStore.getClass() + http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java new file mode 100644 index 0000000..7a5b702 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages; + +/** + * There are two types of message-stores currently + * pointer based, and default byte-array based + */ +public enum MessageEncodeAndStoreType { + /** + * Use message-store which is based on list of pointers to encoded messages + */ + POINTER_LIST_PER_VERTEX(true), + /** + * Extract a byte array per partition from one message to many ids encoding + * and then store + */ + EXTRACT_BYTEARRAY_PER_PARTITION(true), + /** + * Use a byte-array to store messages for each partition + */ + BYTEARRAY_PER_PARTITION(false); + + /** Can use one message to many ids encoding? */ + private final boolean oneMessageToManyIdsEncoding; + + /** + * Constructor + * + * @param oneMessageToManyIdsEncoding use one message to many ids encoding + */ + MessageEncodeAndStoreType(boolean oneMessageToManyIdsEncoding) { + this.oneMessageToManyIdsEncoding = oneMessageToManyIdsEncoding; + } + + /** + * True if one message to many ids encoding is set + * @return return oneMessageToManyIdsEncoding + */ + public boolean useOneMessageToManyIdsEncoding() { + return oneMessageToManyIdsEncoding; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java index 7d0bbc6..6f1179a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java @@ -34,6 +34,14 @@ import org.apache.hadoop.io.WritableComparable; public interface MessageStore { /** + * True if this message-store encodes messages as a list of long pointers + * to compact serialized messages + * + * @return true if we encode messages as a list of pointers + */ + boolean isPointerListEncoding(); + + /** * Gets messages for a vertex. The lifetime of every message is only * guaranteed until the iterator's next() method is called. Do not hold * references to objects returned by this iterator. @@ -79,6 +87,13 @@ public interface MessageStore messages) throws IOException { http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java new file mode 100644 index 0000000..e5a1691 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongListIterator; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.UnsafeReusableByteArrayInput; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Create an iterable for messages based on a pointer list + * + * @param messageType + */ +public class PointerListMessagesIterable + implements Iterable { + /** Message class */ + private final MessageValueFactory messageValueFactory; + /** List of pointers to messages in byte array */ + private final LongArrayList pointers; + /** Holds the byte arrays of serialized messages */ + private final ExtendedByteArrayOutputBuffer msgBuffer; + /** Reader to read data from byte buffer */ + private final UnsafeReusableByteArrayInput messageReader; + + /** + * + * @param messageValueFactory message value factory + * @param pointers pointers to messages in buffer + * @param msgBuffer holds the byte arrays of serialized messages + */ + public PointerListMessagesIterable(MessageValueFactory messageValueFactory, + LongArrayList pointers, ExtendedByteArrayOutputBuffer msgBuffer) { + this.messageValueFactory = messageValueFactory; + this.pointers = pointers; + this.msgBuffer = msgBuffer; + // TODO - if needed implement same for Safe as well + messageReader = new UnsafeReusableByteArrayInput(); + } + + /** + * Create message from factory + * + * @return message instance + */ + protected M createMessage() { + return messageValueFactory.newInstance(); + } + + @Override + public Iterator iterator() { + return new Iterator() { + private final LongListIterator iterator = pointers.iterator(); + private final M reusableMsg = + PointerListMessagesIterable.this.createMessage(); + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public M next() { + long pointer = iterator.next(); + try { + int index = (int) (pointer >>> 32); + int offset = (int) pointer; + ExtendedDataOutput buffer = msgBuffer.getDataOutput(index); + messageReader.initialize(buffer.getByteArray(), offset, + buffer.getPos()); + reusableMsg.readFields(messageReader); + } catch (IOException e) { + throw new IllegalStateException("Got exception : " + e); + } + return reusableMsg; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java new file mode 100644 index 0000000..cce0439 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.VertexIdMessageIterator; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + +import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut; + +/** + * Implementation of {@link SimpleMessageStore} where multiple messages are + * stored as a list of long pointers to extended data output objects + * Used when there is no combiner provided. + * + * @param vertexId type + * @param message type + */ +public class PointerListPerVertexStore extends AbstractListPerVertexStore { + + /** Buffers of byte array outputs used to store messages - thread safe */ + private final ExtendedByteArrayOutputBuffer bytesBuffer; + + /** + * Constructor + * + * @param messageValueFactory Message class held in the store + * @param service Service worker + * @param config Hadoop configuration + */ + public PointerListPerVertexStore( + MessageValueFactory messageValueFactory, + CentralizedServiceWorker service, + ImmutableClassesGiraphConfiguration config) { + super(messageValueFactory, service, config); + bytesBuffer = new ExtendedByteArrayOutputBuffer(config); + } + + @Override + public boolean isPointerListEncoding() { + return true; + } + + @Override + protected LongArrayList createList() { + return new LongArrayList(); + } + + @Override + public void addPartitionMessages( + int partitionId, VertexIdMessages messages) throws IOException { + VertexIdMessageIterator vertexIdMessageIterator = + messages.getVertexIdMessageIterator(); + long pointer = 0; + LongArrayList list; + while (vertexIdMessageIterator.hasNext()) { + vertexIdMessageIterator.next(); + M msg = vertexIdMessageIterator.getCurrentMessage(); + list = getOrCreateList(vertexIdMessageIterator); + if (vertexIdMessageIterator.isNewMessage()) { + IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); + pointer = indexAndDataOut.getIndex(); + pointer <<= 32; + ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput(); + pointer += dataOutput.getPos(); + msg.write(dataOutput); + } + synchronized (list) { + list.add(pointer); + } + } + } + + /** + * Get messages as an iterable from message storage + * + * @param pointers list of pointers to messages + * @return Messages as an iterable + */ + @Override + public Iterable getMessagesAsIterable(LongArrayList pointers) { + return new PointerListMessagesIterable<>(messageValueFactory, pointers, + bytesBuffer); + } + + @Override + protected int getNumberOfMessagesIn(ConcurrentMap partitionMap) { + int numberOfMessages = 0; + for (LongArrayList list : partitionMap.values()) { + numberOfMessages += list.size(); + } + return numberOfMessages; + } + + // FIXME -- complete these for check-pointing + @Override + protected void writeMessages(LongArrayList messages, DataOutput out) + throws IOException { + + } + + @Override + protected LongArrayList readFieldsForMessages(DataInput in) + throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java index 13292a2..37b8c05 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java @@ -139,6 +139,10 @@ public abstract class SimpleMessageStore getPartitionDestinationVertices(int partitionId) { ConcurrentMap partitionMap = map.get(partitionId); return (partitionMap == null) ? Collections.emptyList() : http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java index 18b7798..3000cd4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java @@ -83,6 +83,11 @@ public class DiskBackedMessageStore messages) throws IOException { @@ -106,6 +111,10 @@ public class DiskBackedMessageStore getVertexMessages(I vertexId) throws IOException { if (hasMessagesForVertex(vertexId)) { return getMessageStore(vertexId).getVertexMessages(vertexId); http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java index dbc1ce8..0012bf0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java @@ -94,6 +94,11 @@ public class IntByteArrayMessageStore } } + @Override + public boolean isPointerListEncoding() { + return false; + } + /** * Get map which holds messages for partition which vertex belongs to. * @@ -161,6 +166,10 @@ public class IntByteArrayMessageStore } @Override + public void finalizeStore() { + } + + @Override public void clearPartition(int partitionId) throws IOException { map.get(partitionId).clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java index be75ee8..97086e1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java @@ -86,6 +86,11 @@ public class IntFloatMessageStore } } + @Override + public boolean isPointerListEncoding() { + return false; + } + /** * Get map which holds messages for partition which vertex belongs to. * @@ -126,6 +131,10 @@ public class IntFloatMessageStore } @Override + public void finalizeStore() { + } + + @Override public void clearPartition(int partitionId) throws IOException { map.get(partitionId).clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java deleted file mode 100644 index 3110864..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.messages.primitives; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.comm.messages.MessagesIterable; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.factories.MessageValueFactory; -import org.apache.giraph.partition.Partition; -import org.apache.giraph.utils.VertexIdMessageBytesIterator; -import org.apache.giraph.utils.VertexIdMessageIterator; -import org.apache.giraph.utils.VertexIdMessages; -import org.apache.giraph.utils.VerboseByteStructMessageWrite; -import org.apache.giraph.utils.EmptyIterable; -import org.apache.giraph.utils.io.DataInputOutput; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; - -import com.google.common.collect.Lists; - -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; -import it.unimi.dsi.fastutil.longs.LongIterator; -import it.unimi.dsi.fastutil.objects.ObjectIterator; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -/** - * Special message store to be used when ids are LongWritable and no combiner - * is used. - * Uses fastutil primitive maps in order to decrease number of objects and - * get better performance. - * - * @param Message type - */ -public class LongByteArrayMessageStore - implements MessageStore { - /** Message value factory */ - protected final MessageValueFactory messageValueFactory; - /** Map from partition id to map from vertex id to message */ - private final - Int2ObjectOpenHashMap> map; - /** Service worker */ - private final CentralizedServiceWorker service; - /** Giraph configuration */ - private final ImmutableClassesGiraphConfiguration config; - - /** - * Constructor - * - * @param messageValueFactory Factory for creating message values - * @param service Service worker - * @param config Hadoop configuration - */ - public LongByteArrayMessageStore( - MessageValueFactory messageValueFactory, - CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration - config) { - this.messageValueFactory = messageValueFactory; - this.service = service; - this.config = config; - - map = - new Int2ObjectOpenHashMap>(); - for (int partitionId : service.getPartitionStore().getPartitionIds()) { - Partition partition = - service.getPartitionStore().getOrCreatePartition(partitionId); - Long2ObjectOpenHashMap partitionMap = - new Long2ObjectOpenHashMap( - (int) partition.getVertexCount()); - map.put(partitionId, partitionMap); - service.getPartitionStore().putPartition(partition); - } - } - - /** - * Get map which holds messages for partition which vertex belongs to. - * - * @param vertexId Id of the vertex - * @return Map which holds messages for partition which vertex belongs to. - */ - private Long2ObjectOpenHashMap getPartitionMap( - LongWritable vertexId) { - return map.get(service.getPartitionId(vertexId)); - } - - /** - * Get the DataInputOutput for a vertex id, creating if necessary. - * - * @param partitionMap Partition map to look in - * @param vertexId Id of the vertex - * @return DataInputOutput for this vertex id (created if necessary) - */ - private DataInputOutput getDataInputOutput( - Long2ObjectOpenHashMap partitionMap, - long vertexId) { - DataInputOutput dataInputOutput = partitionMap.get(vertexId); - if (dataInputOutput == null) { - dataInputOutput = config.createMessagesInputOutput(); - partitionMap.put(vertexId, dataInputOutput); - } - return dataInputOutput; - } - - @Override - public void addPartitionMessages(int partitionId, - VertexIdMessages messages) throws - IOException { - Long2ObjectOpenHashMap partitionMap = - map.get(partitionId); - synchronized (partitionMap) { - VertexIdMessageBytesIterator - vertexIdMessageBytesIterator = - messages.getVertexIdMessageBytesIterator(); - // Try to copy the message buffer over rather than - // doing a deserialization of a message just to know its size. This - // should be more efficient for complex objects where serialization is - // expensive. If this type of iterator is not available, fall back to - // deserializing/serializing the messages - if (vertexIdMessageBytesIterator != null) { - while (vertexIdMessageBytesIterator.hasNext()) { - vertexIdMessageBytesIterator.next(); - DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, - vertexIdMessageBytesIterator.getCurrentVertexId().get()); - vertexIdMessageBytesIterator.writeCurrentMessageBytes( - dataInputOutput.getDataOutput()); - } - } else { - VertexIdMessageIterator - iterator = messages.getVertexIdMessageIterator(); - while (iterator.hasNext()) { - iterator.next(); - DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, - iterator.getCurrentVertexId().get()); - VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, - dataInputOutput.getDataOutput()); - } - } - } - } - - @Override - public void clearPartition(int partitionId) throws IOException { - map.get(partitionId).clear(); - } - - @Override - public boolean hasMessagesForVertex(LongWritable vertexId) { - return getPartitionMap(vertexId).containsKey(vertexId.get()); - } - - @Override - public Iterable getVertexMessages( - LongWritable vertexId) throws IOException { - DataInputOutput dataInputOutput = - getPartitionMap(vertexId).get(vertexId.get()); - if (dataInputOutput == null) { - return EmptyIterable.get(); - } else { - return new MessagesIterable(dataInputOutput, messageValueFactory); - } - } - - @Override - public void clearVertexMessages(LongWritable vertexId) throws IOException { - getPartitionMap(vertexId).remove(vertexId.get()); - } - - @Override - public void clearAll() throws IOException { - map.clear(); - } - - @Override - public Iterable getPartitionDestinationVertices( - int partitionId) { - Long2ObjectOpenHashMap partitionMap = - map.get(partitionId); - List vertices = - Lists.newArrayListWithCapacity(partitionMap.size()); - LongIterator iterator = partitionMap.keySet().iterator(); - while (iterator.hasNext()) { - vertices.add(new LongWritable(iterator.nextLong())); - } - return vertices; - } - - @Override - public void writePartition(DataOutput out, - int partitionId) throws IOException { - Long2ObjectOpenHashMap partitionMap = - map.get(partitionId); - out.writeInt(partitionMap.size()); - ObjectIterator> iterator = - partitionMap.long2ObjectEntrySet().fastIterator(); - while (iterator.hasNext()) { - Long2ObjectMap.Entry entry = iterator.next(); - out.writeLong(entry.getLongKey()); - entry.getValue().write(out); - } - } - - @Override - public void readFieldsForPartition(DataInput in, - int partitionId) throws IOException { - int size = in.readInt(); - Long2ObjectOpenHashMap partitionMap = - new Long2ObjectOpenHashMap(size); - while (size-- > 0) { - long vertexId = in.readLong(); - DataInputOutput dataInputOutput = config.createMessagesInputOutput(); - dataInputOutput.readFields(in); - partitionMap.put(vertexId, dataInputOutput); - } - synchronized (map) { - map.put(partitionId, partitionMap); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java index 264e65a..b0452c1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java @@ -83,6 +83,11 @@ public class LongDoubleMessageStore } } + @Override + public boolean isPointerListEncoding() { + return false; + } + /** * Get map which holds messages for partition which vertex belongs to. * @@ -123,6 +128,10 @@ public class LongDoubleMessageStore } @Override + public void finalizeStore() { + } + + @Override public void clearPartition(int partitionId) throws IOException { map.get(partitionId).clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java new file mode 100644 index 0000000..ae61de4 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages.primitives.long_id; + +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.partition.Partition; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.utils.VertexIdIterator; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import java.util.List; + +/** + * Special message store to be used when ids are LongWritable and no combiner + * is used. + * Uses fastutil primitive maps in order to decrease number of objects and + * get better performance. + * + * @param message type + * @param list type + */ +public abstract class LongAbstractListMessageStore extends LongAbstractMessageStore { + /** + * Map used to store messages for nascent vertices i.e., ones + * that did not exist at the start of current superstep but will get + * created because of sending message to a non-existent vertex id + */ + private final + Int2ObjectOpenHashMap> nascentMap; + + /** + * Constructor + * + * @param messageValueFactory Factory for creating message values + * @param service Service worker + * @param config Hadoop configuration + */ + public LongAbstractListMessageStore( + MessageValueFactory messageValueFactory, + CentralizedServiceWorker service, + ImmutableClassesGiraphConfiguration config) { + super(messageValueFactory, service, config); + populateMap(); + + // create map for vertex ids (i.e., nascent vertices) not known yet + nascentMap = new Int2ObjectOpenHashMap<>(); + for (int partitionId : service.getPartitionStore().getPartitionIds()) { + nascentMap.put(partitionId, new Long2ObjectOpenHashMap()); + } + } + + /** + * Populate the map with all vertexIds for each partition + */ + private void populateMap() { // TODO - can parallelize? + // populate with vertex ids already known + for (int partitionId : service.getPartitionStore().getPartitionIds()) { + Partition partition = service.getPartitionStore() + .getOrCreatePartition(partitionId); + Long2ObjectOpenHashMap partitionMap = map.get(partitionId); + for (Vertex vertex : partition) { + partitionMap.put(vertex.getId().get(), createList()); + } + } + } + + /** + * Create an instance of L + * @return instance of L + */ + protected abstract L createList(); + + /** + * Get list for the current vertexId + * + * @param iterator vertexId iterator + * @return list for current vertexId + */ + protected L getList( + VertexIdIterator iterator) { + PartitionOwner owner = + service.getVertexPartitionOwner(iterator.getCurrentVertexId()); + long vertexId = iterator.getCurrentVertexId().get(); + int partitionId = owner.getPartitionId(); + Long2ObjectOpenHashMap partitionMap = map.get(partitionId); + if (!partitionMap.containsKey(vertexId)) { + synchronized (nascentMap) { + // assumption: not many nascent vertices are created + // so overall synchronization is negligible + Long2ObjectOpenHashMap nascentPartitionMap = + nascentMap.get(partitionId); + if (nascentPartitionMap.get(vertexId) == null) { + nascentPartitionMap.put(vertexId, createList()); + } + return nascentPartitionMap.get(vertexId); + } + } + return partitionMap.get(vertexId); + } + + @Override + public void finalizeStore() { + for (int partitionId : nascentMap.keySet()) { + // nascent vertices are present only in nascent map + map.get(partitionId).putAll(nascentMap.get(partitionId)); + } + nascentMap.clear(); + } + + // TODO - discussion + /* + some approaches for ensuring correctness with parallel inserts + - current approach: uses a small extra bit of memory by pre-populating + map & pushes everything map cannot handle to nascentMap + at the beginning of next superstep compute a single threaded finalizeStore is + called (so little extra memory + 1 sequential finish ops) + - used striped parallel fast utils instead (unsure of perf) + - use concurrent map (every get gets far slower) + - use reader writer locks (unsure of perf) + (code looks something like underneath) + + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + rwl.readLock().lock(); + L list = partitionMap.get(vertexId); + if (list == null) { + rwl.readLock().unlock(); + rwl.writeLock().lock(); + if (partitionMap.get(vertexId) == null) { + list = createList(); + partitionMap.put(vertexId, list); + } + rwl.readLock().lock(); + rwl.writeLock().unlock(); + } + rwl.readLock().unlock(); + - adopted from the article + http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\ + ReentrantReadWriteLock.html + */ +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java new file mode 100644 index 0000000..9ee090e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages.primitives.long_id; + +import com.google.common.collect.Lists; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongIterator; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.partition.Partition; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; +import java.util.List; + +/** + * Special message store to be used when ids are LongWritable and no combiner + * is used. + * Uses fastutil primitive maps in order to decrease number of objects and + * get better performance. + * + * @param message type + * @param datastructure used to hold messages + */ +public abstract class LongAbstractMessageStore + implements MessageStore { + /** Message value factory */ + protected final MessageValueFactory messageValueFactory; + /** Map from partition id to map from vertex id to message */ + protected final + Int2ObjectOpenHashMap> map; + /** Service worker */ + protected final CentralizedServiceWorker service; + /** Giraph configuration */ + protected final ImmutableClassesGiraphConfiguration + config; + + /** + * Constructor + * + * @param messageValueFactory Factory for creating message values + * @param service Service worker + * @param config Hadoop configuration + */ + public LongAbstractMessageStore( + MessageValueFactory messageValueFactory, + CentralizedServiceWorker service, + ImmutableClassesGiraphConfiguration + config) { + this.messageValueFactory = messageValueFactory; + this.service = service; + this.config = config; + + map = new Int2ObjectOpenHashMap<>(); + for (int partitionId : service.getPartitionStore().getPartitionIds()) { + Partition partition = + service.getPartitionStore().getOrCreatePartition(partitionId); + Long2ObjectOpenHashMap partitionMap = + new Long2ObjectOpenHashMap( + (int) partition.getVertexCount()); + map.put(partitionId, partitionMap); + service.getPartitionStore().putPartition(partition); + } + } + + /** + * Get map which holds messages for partition which vertex belongs to. + * + * @param vertexId Id of the vertex + * @return Map which holds messages for partition which vertex belongs to. + */ + protected Long2ObjectOpenHashMap getPartitionMap( + LongWritable vertexId) { + return map.get(service.getPartitionId(vertexId)); + } + + @Override + public void clearPartition(int partitionId) throws IOException { + map.get(partitionId).clear(); + } + + @Override + public boolean hasMessagesForVertex(LongWritable vertexId) { + return getPartitionMap(vertexId).containsKey(vertexId.get()); + } + + @Override + public void clearVertexMessages(LongWritable vertexId) throws IOException { + getPartitionMap(vertexId).remove(vertexId.get()); + } + + + @Override + public void clearAll() throws IOException { + map.clear(); + } + + @Override + public Iterable getPartitionDestinationVertices( + int partitionId) { + Long2ObjectOpenHashMap partitionMap = + map.get(partitionId); + List vertices = + Lists.newArrayListWithCapacity(partitionMap.size()); + LongIterator iterator = partitionMap.keySet().iterator(); + while (iterator.hasNext()) { + vertices.add(new LongWritable(iterator.nextLong())); + } + return vertices; + } + +}