Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C3DBD189F9 for ; Thu, 30 Jul 2015 21:07:58 +0000 (UTC) Received: (qmail 63514 invoked by uid 500); 30 Jul 2015 21:07:55 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 63476 invoked by uid 500); 30 Jul 2015 21:07:55 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 63458 invoked by uid 99); 30 Jul 2015 21:07:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jul 2015 21:07:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 75801E7146; Thu, 30 Jul 2015 21:07:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aching@apache.org To: commits@giraph.apache.org Date: Thu, 30 Jul 2015 21:07:56 -0000 Message-Id: <536158ddce1246068fe2f091e8af6c4a@git.apache.org> In-Reply-To: <907d788a1edb490b9bab2feb60ce6037@git.apache.org> References: <907d788a1edb490b9bab2feb60ce6037@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: updated refs/heads/trunk to 6f5a457 [GIRAPH-1023] Adding out-of-core messages to previously implemented adaptive out-of-core mechanism Summary: This is the continuation of the previous diff on out-of-core mechanism. This diff completes the last diff by adding out-of-core messages, making the entire out-of-core mechanism a cohesive entity in Giraph. This diff also improves the API of PartitionStore by some minor refactoring. Test Plan: mvn clean verify running pagerank and turning message combiner off on a large graph with limited memory does not fail Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis Reviewed By: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D42897 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6f5a457f Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6f5a457f Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6f5a457f Branch: refs/heads/trunk Commit: 6f5a457fa93ac8b53cc77afd0eb8729f1e5665af Parents: 4f3551d Author: Hassan Eslami Authored: Thu Jul 30 14:03:12 2015 -0700 Committer: Avery Ching Committed: Thu Jul 30 14:03:46 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 3 + .../java/org/apache/giraph/comm/ServerData.java | 91 +-- .../ByteArrayMessagesPerVertexStore.java | 5 +- .../giraph/comm/messages/MessageData.java | 82 ++ .../giraph/comm/messages/MessageStore.java | 8 + .../comm/messages/SimpleMessageStore.java | 5 + .../out_of_core/DiskBackedMessageStore.java | 7 + .../primitives/IdByteArrayMessageStore.java | 5 + .../primitives/IdOneMessagePerVertexStore.java | 5 + .../primitives/IntByteArrayMessageStore.java | 5 + .../primitives/IntFloatMessageStore.java | 5 + .../primitives/LongDoubleMessageStore.java | 5 + .../long_id/LongAbstractMessageStore.java | 6 +- .../queue/AsyncMessageStoreWrapper.java | 5 + .../NettyWorkerClientRequestProcessor.java | 2 +- .../giraph/comm/netty/NettyWorkerServer.java | 26 +- .../SendPartitionCurrentMessagesRequest.java | 4 +- .../requests/SendWorkerMessagesRequest.java | 4 +- .../SendWorkerOneMessageToManyRequest.java | 13 +- .../apache/giraph/graph/ComputeCallable.java | 1 + .../apache/giraph/graph/GraphTaskManager.java | 3 +- .../giraph/ooc/AdaptiveOutOfCoreEngine.java | 20 +- .../apache/giraph/ooc/CheckMemoryCallable.java | 42 +- .../giraph/ooc/DiskBackedPartitionStore.java | 750 ++++++++++++++----- .../giraph/ooc/OutOfCoreProcessorCallable.java | 31 +- .../apache/giraph/partition/PartitionData.java | 116 +++ .../apache/giraph/partition/PartitionStore.java | 182 +++-- .../giraph/partition/SimplePartitionStore.java | 39 +- .../apache/giraph/worker/BspServiceWorker.java | 10 +- .../apache/giraph/comm/RequestFailureTest.java | 7 +- .../org/apache/giraph/comm/RequestTest.java | 14 +- .../queue/AsyncMessageStoreWrapperTest.java | 5 + .../giraph/partition/TestPartitionStores.java | 59 +- .../java/org/apache/giraph/utils/MockUtils.java | 8 +- .../java/org/apache/giraph/TestOutOfCore.java | 38 +- 35 files changed, 1169 insertions(+), 442 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index c844f61..c2306e9 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,9 @@ Giraph Change Log Release 1.2.0 - unreleased ======= + GIRAPH-1023: Adding out-of-core messages to previously implemented adaptive out-of-core + mechanism (heslami via aching) + GIRAPH-1022: Out-of-core mechanism for input superstep and graph data (heslami via aching) GIRAPH-1021: Missing progress report for graph mutations. (heslami via aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index eddfbc6..b177446 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -31,9 +31,6 @@ import com.google.common.collect.Maps; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.aggregators.AllAggregatorServerData; import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; @@ -65,19 +62,6 @@ public class ServerData conf; /** Partition store for this worker. */ private volatile PartitionStore partitionStore; - /** Message store factory */ - private final MessageStoreFactory> - messageStoreFactory; - /** - * Message store for incoming messages (messages which will be consumed - * in the next super step) - */ - private volatile MessageStore incomingMessageStore; - /** - * Message store for current messages (messages which we received in - * previous super step and which will be consumed in current super step) - */ - private volatile MessageStore currentMessageStore; /** * Map of partition ids to vertex mutations from other workers. These are * mutations that should be applied before execution of *current* super step. @@ -122,18 +106,14 @@ public class ServerData service, ImmutableClassesGiraphConfiguration conf, - MessageStoreFactory> - messageStoreFactory, Mapper.Context context) { this.serviceWorker = service; this.conf = conf; - this.messageStoreFactory = messageStoreFactory; if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) { partitionStore = new DiskBackedPartitionStore(conf, context, @@ -157,78 +137,27 @@ public class ServerData Message data - * @return Incoming message store - */ - public MessageStore getIncomingMessageStore() { - return (MessageStore) incomingMessageStore; - } - - /** - * Get message store for current messages (messages which we received in - * previous super step and which will be consumed in current super step) - * - * @param Message data - * @return Current message store - */ - public MessageStore getCurrentMessageStore() { - return (MessageStore) currentMessageStore; - } - - /** * Re-initialize message stores. * Discards old values if any. * @throws IOException */ public void resetMessageStores() throws IOException { - if (currentMessageStore != null) { - currentMessageStore.clearAll(); - currentMessageStore = null; - } - if (incomingMessageStore != null) { - incomingMessageStore.clearAll(); - incomingMessageStore = null; - } - prepareSuperstep(); + getPartitionStore().resetMessageStores(); + currentWorkerToWorkerMessages = + Collections.synchronizedList(new ArrayList()); + incomingWorkerToWorkerMessages = + Collections.synchronizedList(new ArrayList()); } /** Prepare for next super step */ public void prepareSuperstep() { - if (currentMessageStore != null) { - try { - currentMessageStore.clearAll(); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to clear previous message store"); - } - } - currentMessageStore = - incomingMessageStore != null ? incomingMessageStore : - messageStoreFactory.newStore(conf.getIncomingMessageClasses()); - incomingMessageStore = - messageStoreFactory.newStore(conf.getOutgoingMessageClasses()); - // finalize current message-store before resolving mutations - currentMessageStore.finalizeStore(); - + partitionStore.prepareSuperstep(); currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages; incomingWorkerToWorkerMessages = Collections.synchronizedList(new ArrayList()); } /** - * In case of async message store we have to wait for all messages - * to be processed before going into next superstep. - */ - public void waitForComplete() { - if (incomingMessageStore instanceof AsyncMessageStoreWrapper) { - ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete(); - } - } - - /** * Get the vertex mutations (synchronize on the values) * * @return Vertex mutations @@ -323,7 +252,8 @@ public class ServerData vertexMutations = entry.getValue(); Vertex vertex = vertexResolver.resolve(vertexId, originalVertex, vertexMutations, - getCurrentMessageStore().hasMessagesForVertex(entry.getKey())); + getPartitionStore().getCurrentMessageStore() + .hasMessagesForVertex(entry.getKey())); if (LOG.isDebugEnabled()) { LOG.debug("resolvePartitionMutations: Resolved vertex index " + @@ -339,7 +269,8 @@ public class ServerData destinations = getCurrentMessageStore(). + Iterable destinations = getPartitionStore().getCurrentMessageStore(). getPartitionDestinationVertices(partitionId); if (!Iterables.isEmpty(destinations)) { for (I vertexId : destinations) { http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java index dfb5683..29a0888 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java @@ -191,13 +191,16 @@ public class ByteArrayMessagesPerVertexStore Vertex id * @param Message data */ - private static class Factory + public static class Factory implements MessageStoreFactory> { /** Service worker */ private CentralizedServiceWorker service; /** Hadoop configuration */ private ImmutableClassesGiraphConfiguration config; + /** Constructor for reflection */ + public Factory() { } + /** * @param service Worker service * @param config Hadoop configuration http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java new file mode 100644 index 0000000..f974823 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages; + +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; + +/** + * Structure that keeps message information. + * + * @param Vertex id + */ +public interface MessageData { + /** + * Get message store for incoming messages (messages which will be consumed + * in the next super step) + * + * @param Message data type + * @return Incoming message store + */ + MessageStore getIncomingMessageStore(); + + /** + * Get message store for current messages (messages which we received in + * previous super step and which will be consumed in current super step) + * + * @param Message data type + * @return Current message store + */ + MessageStore getCurrentMessageStore(); + + /** + * Re-initialize message stores. + * Discards old values if any. + + * @throws IOException + */ + void resetMessageStores() throws IOException; + + /** + * Adds messages for partition to current message store + * + * @param Message data type + * @param partitionId Id of partition + * @param messages Collection of vertex ids and messages we want to add + * @throws IOException + */ + void addPartitionCurrentMessages( + int partitionId, VertexIdMessages messages) + throws IOException; + + /** + * Adds messages for partition to incoming message store + * + * @param Message data type + * @param partitionId Id of partition + * @param messages Collection of vertex ids and messages we want to add + * @throws IOException + */ + void addPartitionIncomingMessages( + int partitionId, VertexIdMessages messages) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java index 6f1179a..6e85ea3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java @@ -76,6 +76,14 @@ public interface MessageStore getPartitionDestinationVertices(int partitionId) { PartitionDiskBackedMessageStore messageStore = partitionMessageStores.get(partitionId); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java index 0732079..e1e7a3f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java @@ -186,6 +186,11 @@ public class IdByteArrayMessageStore getVertexMessages(I vertexId) throws IOException { DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId); if (dataInputOutput == null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java index a61536f..b172d24 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java @@ -162,6 +162,11 @@ public class IdOneMessagePerVertexStore getVertexMessages( I vertexId) throws IOException { Basic2ObjectMap partitionMap = getPartitionMap(vertexId); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java index a8c19be..2fbc35c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java @@ -177,6 +177,11 @@ public class IntByteArrayMessageStore } @Override + public boolean hasMessagesForPartition(int partitionId) { + return !map.get(partitionId).isEmpty(); + } + + @Override public Iterable getVertexMessages( IntWritable vertexId) throws IOException { DataInputOutput dataInputOutput = http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java index 7a4ed09..3186224 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java @@ -138,6 +138,11 @@ public class IntFloatMessageStore } @Override + public boolean hasMessagesForPartition(int partitionId) { + return !map.get(partitionId).isEmpty(); + } + + @Override public Iterable getVertexMessages( IntWritable vertexId) throws IOException { Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java index 069face..6278c16 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java @@ -139,6 +139,11 @@ public class LongDoubleMessageStore } @Override + public boolean hasMessagesForPartition(int partitionId) { + return !map.get(partitionId).isEmpty(); + } + + @Override public Iterable getVertexMessages( LongWritable vertexId) throws IOException { Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java index 50e8818..d8e5246 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java @@ -101,6 +101,11 @@ public abstract class LongAbstractMessageStore } @Override + public boolean hasMessagesForPartition(int partitionId) { + return !map.get(partitionId).isEmpty(); + } + + @Override public void clearVertexMessages(LongWritable vertexId) throws IOException { getPartitionMap(vertexId).remove(vertexId.get()); } @@ -124,5 +129,4 @@ public abstract class LongAbstractMessageStore } return vertices; } - } http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java index 252ee39..e820f26 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java @@ -142,6 +142,11 @@ public final class AsyncMessageStoreWrapper messages) throws IOException { int hash = partition2Queue.get(partitionId); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java index 1cd1bd6..e9b072a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java @@ -210,7 +210,7 @@ public class NettyWorkerClientRequestProcessor partition) { final int partitionId = partition.getId(); MessageStore messageStore = - serverData.getCurrentMessageStore(); + serverData.getPartitionStore().getCurrentMessageStore(); ByteArrayVertexIdMessages vertexIdMessages = new ByteArrayVertexIdMessages( configuration.createOutgoingMessageValueFactory()); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java index 37fb246..600cb1a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java @@ -21,11 +21,8 @@ package org.apache.giraph.comm.netty; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerServer; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.utils.ReflectionUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -33,8 +30,6 @@ import org.apache.log4j.Logger; import java.net.InetSocketAddress; -import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS; - /** * Netty worker server that implement {@link WorkerServer} and contains * the actual {@link ServerData}. @@ -78,8 +73,7 @@ public class NettyWorkerServer(service, conf, createMessageStoreFactory(), - context); + new ServerData(service, conf, context); nettyServer = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), @@ -87,24 +81,6 @@ public class NettyWorkerServer> - createMessageStoreFactory() { - Class messageStoreFactoryClass = - MESSAGE_STORE_FACTORY_CLASS.get(conf); - - MessageStoreFactory messageStoreFactoryInstance = - ReflectionUtils.newInstance(messageStoreFactoryClass); - messageStoreFactoryInstance.initialize(service, conf); - - return messageStoreFactoryInstance; - } - @Override public InetSocketAddress getMyAddress() { return nettyServer.getMyAddress(); http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java index b59d0cf..ab66aa3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java @@ -86,8 +86,8 @@ public class SendPartitionCurrentMessagesRequest serverData) { try { - serverData.getCurrentMessageStore().addPartitionMessages(partitionId, - vertexIdMessageMap); + serverData.getPartitionStore() + .addPartitionCurrentMessages(partitionId, vertexIdMessageMap); } catch (IOException e) { throw new RuntimeException("doRequest: Got IOException ", e); } http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java index 6953998..e9be327 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java @@ -71,8 +71,8 @@ public class SendWorkerMessagesRequest messageStore = serverData.getIncomingMessageStore(); - if (messageStore.isPointerListEncoding()) { + if (serverData.getPartitionStore().getIncomingMessageStore() + .isPointerListEncoding()) { // if message store is pointer list based then send data as is - messageStore.addPartitionMessages(-1, oneMessageToManyIds); + serverData.getPartitionStore() + .addPartitionIncomingMessages(-1, oneMessageToManyIds); } else { // else split the data per partition and send individually CentralizedServiceWorker serviceWorker = serverData.getServiceWorker(); @@ -144,8 +144,9 @@ public class SendWorkerOneMessageToManyRequest idMsgs : partitionIdMsgs.entrySet()) { if (!idMsgs.getValue().isEmpty()) { - serverData.getIncomingMessageStore().addPartitionMessages( - idMsgs.getKey(), idMsgs.getValue()); + serverData.getPartitionStore() + .addPartitionIncomingMessages(idMsgs.getKey(), + idMsgs.getValue()); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index 923e427..fa268da 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -103,6 +103,7 @@ public class ComputeCallable messageStore = - serviceWorker.getServerData().getCurrentMessageStore(); + serviceWorker.getServerData().getPartitionStore() + .getCurrentMessageStore(); int numPartitions = serviceWorker.getPartitionStore().getNumPartitions(); int numThreads = Math.min(numComputeThreads, numPartitions); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java index 8d3cab6..749e41e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java @@ -79,15 +79,21 @@ public class AdaptiveOutOfCoreEngine partitionsWithInputVertices; /** * List of partitions that are on disk, and their loaded *edges* during - * INPUT_SUPERSTEP are ready to flush to disk + * INPUT_SUPERSTEP are ready to be flushed to disk */ private final BlockingQueue partitionsWithInputEdges; - /** Number of partitions to be written to the disk */ + /** + * List of partitions that are on disk, and their message buffers (either + * messages for current superstep, or incoming messages for next superstep) + * are ready to be flushed to disk + */ + private final BlockingQueue partitionsWithPendingMessages; + /** Number of partitions to be written to disk */ private final AtomicInteger numPartitionsToSpill; /** Executor service for check memory thread */ @@ -128,6 +134,7 @@ public class AdaptiveOutOfCoreEngine(100); this.partitionsWithInputEdges = new ArrayBlockingQueue(100); + this.partitionsWithPendingMessages = new ArrayBlockingQueue(100); this.numPartitionsToSpill = new AtomicInteger(0); } @@ -217,6 +224,13 @@ public class AdaptiveOutOfCoreEngine getPartitionsWithPendingMessages() { + return partitionsWithPendingMessages; + } + + /** * @return number of partitions to spill to disk */ public AtomicInteger getNumPartitionsToSpill() { http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java index 7f52490..2b2c990 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java @@ -232,7 +232,6 @@ public class CheckMemoryCallable partitionsWithInputEdges = oocEngine.getPartitionsWithInputEdges(); + BlockingQueue partitionsWithPendingMessages = + oocEngine.getPartitionsWithPendingMessages(); AtomicInteger numPartitionsToSpill = oocEngine.getNumPartitionsToSpill(); - while (freeMemory < midFreeMemoryFraction * maxMemory) { // Offload input vertex buffer of OOC partitions if we are in // INPUT_SUPERSTEP @@ -276,7 +276,7 @@ public class CheckMemoryCallable pairs = partitionStore.getOocPartitionIdsWithPendingInputVertices(); - freeMemory -= createCommand(pairs, partitionsWithInputVertices); + freeMemory -= createCommands(pairs, partitionsWithInputVertices); } // Offload edge store of OOC partitions if we are in INPUT_SUPERSTEP @@ -284,7 +284,15 @@ public class CheckMemoryCallable pairs = partitionStore.getOocPartitionIdsWithPendingInputEdges(); - freeMemory -= createCommand(pairs, partitionsWithInputEdges); + freeMemory -= createCommands(pairs, partitionsWithInputEdges); + } + + // Offload message buffers of OOC partitions if we are still low in + // free memory + if (freeMemory < midFreeMemoryFraction * maxMemory) { + PairList pairs = + partitionStore.getOocPartitionIdsWithPendingMessages(); + freeMemory -= createCommands(pairs, partitionsWithPendingMessages); } // Offload partitions if we are still low in free memory @@ -295,14 +303,16 @@ public class CheckMemoryCallable pairs, + private double createCommands(PairList pairs, BlockingQueue commands) { double usedMemory = 0; if (pairs.getSize() != 0) {