[GIRAPH-1023] Adding out-of-core messages to previously implemented adaptive out-of-core mechanism
Summary:
This is the continuation of the previous diff on out-of-core mechanism. This diff completes
the last diff by adding out-of-core messages, making the entire out-of-core mechanism a cohesive
entity in Giraph.
This diff also improves the API of PartitionStore by some minor refactoring.
Test Plan:
mvn clean verify
running pagerank and turning message combiner off on a large graph with limited memory does
not fail
Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis
Reviewed By: dionysis.logothetis
Differential Revision: https://reviews.facebook.net/D42897
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6f5a457f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6f5a457f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6f5a457f
Branch: refs/heads/trunk
Commit: 6f5a457fa93ac8b53cc77afd0eb8729f1e5665af
Parents: 4f3551d
Author: Hassan Eslami <heslami@fb.com>
Authored: Thu Jul 30 14:03:12 2015 -0700
Committer: Avery Ching <aching@fb.com>
Committed: Thu Jul 30 14:03:46 2015 -0700
----------------------------------------------------------------------
CHANGELOG | 3 +
.../java/org/apache/giraph/comm/ServerData.java | 91 +--
.../ByteArrayMessagesPerVertexStore.java | 5 +-
.../giraph/comm/messages/MessageData.java | 82 ++
.../giraph/comm/messages/MessageStore.java | 8 +
.../comm/messages/SimpleMessageStore.java | 5 +
.../out_of_core/DiskBackedMessageStore.java | 7 +
.../primitives/IdByteArrayMessageStore.java | 5 +
.../primitives/IdOneMessagePerVertexStore.java | 5 +
.../primitives/IntByteArrayMessageStore.java | 5 +
.../primitives/IntFloatMessageStore.java | 5 +
.../primitives/LongDoubleMessageStore.java | 5 +
.../long_id/LongAbstractMessageStore.java | 6 +-
.../queue/AsyncMessageStoreWrapper.java | 5 +
.../NettyWorkerClientRequestProcessor.java | 2 +-
.../giraph/comm/netty/NettyWorkerServer.java | 26 +-
.../SendPartitionCurrentMessagesRequest.java | 4 +-
.../requests/SendWorkerMessagesRequest.java | 4 +-
.../SendWorkerOneMessageToManyRequest.java | 13 +-
.../apache/giraph/graph/ComputeCallable.java | 1 +
.../apache/giraph/graph/GraphTaskManager.java | 3 +-
.../giraph/ooc/AdaptiveOutOfCoreEngine.java | 20 +-
.../apache/giraph/ooc/CheckMemoryCallable.java | 42 +-
.../giraph/ooc/DiskBackedPartitionStore.java | 750 ++++++++++++++-----
.../giraph/ooc/OutOfCoreProcessorCallable.java | 31 +-
.../apache/giraph/partition/PartitionData.java | 116 +++
.../apache/giraph/partition/PartitionStore.java | 182 +++--
.../giraph/partition/SimplePartitionStore.java | 39 +-
.../apache/giraph/worker/BspServiceWorker.java | 10 +-
.../apache/giraph/comm/RequestFailureTest.java | 7 +-
.../org/apache/giraph/comm/RequestTest.java | 14 +-
.../queue/AsyncMessageStoreWrapperTest.java | 5 +
.../giraph/partition/TestPartitionStores.java | 59 +-
.../java/org/apache/giraph/utils/MockUtils.java | 8 +-
.../java/org/apache/giraph/TestOutOfCore.java | 38 +-
35 files changed, 1169 insertions(+), 442 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c844f61..c2306e9 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,9 @@ Giraph Change Log
Release 1.2.0 - unreleased
=======
+ GIRAPH-1023: Adding out-of-core messages to previously implemented adaptive out-of-core
+ mechanism (heslami via aching)
+
GIRAPH-1022: Out-of-core mechanism for input superstep and graph data (heslami via aching)
GIRAPH-1021: Missing progress report for graph mutations. (heslami via aching)
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index eddfbc6..b177446 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -31,9 +31,6 @@ import com.google.common.collect.Maps;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
@@ -65,19 +62,6 @@ public class ServerData<I extends WritableComparable,
private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Partition store for this worker. */
private volatile PartitionStore<I, V, E> partitionStore;
- /** Message store factory */
- private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
- messageStoreFactory;
- /**
- * Message store for incoming messages (messages which will be consumed
- * in the next super step)
- */
- private volatile MessageStore<I, Writable> incomingMessageStore;
- /**
- * Message store for current messages (messages which we received in
- * previous super step and which will be consumed in current super step)
- */
- private volatile MessageStore<I, Writable> currentMessageStore;
/**
* Map of partition ids to vertex mutations from other workers. These are
* mutations that should be applied before execution of *current* super step.
@@ -122,18 +106,14 @@ public class ServerData<I extends WritableComparable,
*
* @param service Service worker
* @param conf Configuration
- * @param messageStoreFactory Factory for message stores
* @param context Mapper context
*/
public ServerData(
CentralizedServiceWorker<I, V, E> service,
ImmutableClassesGiraphConfiguration<I, V, E> conf,
- MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
- messageStoreFactory,
Mapper<?, ?, ?, ?>.Context context) {
this.serviceWorker = service;
this.conf = conf;
- this.messageStoreFactory = messageStoreFactory;
if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
partitionStore =
new DiskBackedPartitionStore<I, V, E>(conf, context,
@@ -157,78 +137,27 @@ public class ServerData<I extends WritableComparable,
}
/**
- * Get message store for incoming messages (messages which will be consumed
- * in the next super step)
- *
- * @param <M> Message data
- * @return Incoming message store
- */
- public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
- return (MessageStore<I, M>) incomingMessageStore;
- }
-
- /**
- * Get message store for current messages (messages which we received in
- * previous super step and which will be consumed in current super step)
- *
- * @param <M> Message data
- * @return Current message store
- */
- public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
- return (MessageStore<I, M>) currentMessageStore;
- }
-
- /**
* Re-initialize message stores.
* Discards old values if any.
* @throws IOException
*/
public void resetMessageStores() throws IOException {
- if (currentMessageStore != null) {
- currentMessageStore.clearAll();
- currentMessageStore = null;
- }
- if (incomingMessageStore != null) {
- incomingMessageStore.clearAll();
- incomingMessageStore = null;
- }
- prepareSuperstep();
+ getPartitionStore().resetMessageStores();
+ currentWorkerToWorkerMessages =
+ Collections.synchronizedList(new ArrayList<Writable>());
+ incomingWorkerToWorkerMessages =
+ Collections.synchronizedList(new ArrayList<Writable>());
}
/** Prepare for next super step */
public void prepareSuperstep() {
- if (currentMessageStore != null) {
- try {
- currentMessageStore.clearAll();
- } catch (IOException e) {
- throw new IllegalStateException(
- "Failed to clear previous message store");
- }
- }
- currentMessageStore =
- incomingMessageStore != null ? incomingMessageStore :
- messageStoreFactory.newStore(conf.getIncomingMessageClasses());
- incomingMessageStore =
- messageStoreFactory.newStore(conf.getOutgoingMessageClasses());
- // finalize current message-store before resolving mutations
- currentMessageStore.finalizeStore();
-
+ partitionStore.prepareSuperstep();
currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
incomingWorkerToWorkerMessages =
Collections.synchronizedList(new ArrayList<Writable>());
}
/**
- * In case of async message store we have to wait for all messages
- * to be processed before going into next superstep.
- */
- public void waitForComplete() {
- if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
- ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
- }
- }
-
- /**
* Get the vertex mutations (synchronize on the values)
*
* @return Vertex mutations
@@ -323,7 +252,8 @@ public class ServerData<I extends WritableComparable,
VertexMutations<I, V, E> vertexMutations = entry.getValue();
Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
originalVertex, vertexMutations,
- getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
+ getPartitionStore().getCurrentMessageStore()
+ .hasMessagesForVertex(entry.getKey()));
if (LOG.isDebugEnabled()) {
LOG.debug("resolvePartitionMutations: Resolved vertex index " +
@@ -339,7 +269,8 @@ public class ServerData<I extends WritableComparable,
} else if (originalVertex != null) {
partition.removeVertex(vertexId);
try {
- getCurrentMessageStore().clearVertexMessages(vertexId);
+ getPartitionStore().getCurrentMessageStore()
+ .clearVertexMessages(vertexId);
} catch (IOException e) {
throw new IllegalStateException("resolvePartitionMutations: " +
"Caught IOException while clearing messages for a deleted " +
@@ -352,7 +283,7 @@ public class ServerData<I extends WritableComparable,
// Keep track of vertices which are not here in the partition, but have
// received messages
- Iterable<I> destinations = getCurrentMessageStore().
+ Iterable<I> destinations = getPartitionStore().getCurrentMessageStore().
getPartitionDestinationVertices(partitionId);
if (!Iterables.isEmpty(destinations)) {
for (I vertexId : destinations) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index dfb5683..29a0888 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -191,13 +191,16 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
* @param <I> Vertex id
* @param <M> Message data
*/
- private static class Factory<I extends WritableComparable, M extends Writable>
+ public static class Factory<I extends WritableComparable, M extends Writable>
implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Service worker */
private CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+ /** Constructor for reflection */
+ public Factory() { }
+
/**
* @param service Worker service
* @param config Hadoop configuration
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
new file mode 100644
index 0000000..f974823
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Structure that keeps message information.
+ *
+ * @param <I> Vertex id
+ */
+public interface MessageData<I extends WritableComparable> {
+ /**
+ * Get message store for incoming messages (messages which will be consumed
+ * in the next super step)
+ *
+ * @param <M> Message data type
+ * @return Incoming message store
+ */
+ <M extends Writable> MessageStore<I, M> getIncomingMessageStore();
+
+ /**
+ * Get message store for current messages (messages which we received in
+ * previous super step and which will be consumed in current super step)
+ *
+ * @param <M> Message data type
+ * @return Current message store
+ */
+ <M extends Writable> MessageStore<I, M> getCurrentMessageStore();
+
+ /**
+ * Re-initialize message stores.
+ * Discards old values if any.
+
+ * @throws IOException
+ */
+ void resetMessageStores() throws IOException;
+
+ /**
+ * Adds messages for partition to current message store
+ *
+ * @param <M> Message data type
+ * @param partitionId Id of partition
+ * @param messages Collection of vertex ids and messages we want to add
+ * @throws IOException
+ */
+ <M extends Writable> void addPartitionCurrentMessages(
+ int partitionId, VertexIdMessages<I, M> messages)
+ throws IOException;
+
+ /**
+ * Adds messages for partition to incoming message store
+ *
+ * @param <M> Message data type
+ * @param partitionId Id of partition
+ * @param messages Collection of vertex ids and messages we want to add
+ * @throws IOException
+ */
+ <M extends Writable> void addPartitionIncomingMessages(
+ int partitionId, VertexIdMessages<I, M> messages)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 6f1179a..6e85ea3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -76,6 +76,14 @@ public interface MessageStore<I extends WritableComparable,
boolean hasMessagesForVertex(I vertexId);
/**
+ * Check if we have messages for some partition
+ *
+ * @param partitionId Id of partition which we want to check
+ * @return True iff we have messages for the given partition
+ */
+ boolean hasMessagesForPartition(int partitionId);
+
+ /**
* Adds messages for partition
*
* @param partitionId Id of partition
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 37b8c05..a1d3625 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -211,6 +211,11 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return !map.get(partitionId).isEmpty();
+ }
+
+ @Override
public void clearAll() throws IOException {
map.clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 3351051..b28d15b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -34,6 +34,7 @@ import org.apache.giraph.utils.VertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
/**
@@ -130,6 +131,12 @@ public class DiskBackedMessageStore<I extends WritableComparable,
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return !Iterables
+ .isEmpty(getMessageStore(partitionId).getDestinationVertices());
+ }
+
+ @Override
public Iterable<I> getPartitionDestinationVertices(int partitionId) {
PartitionDiskBackedMessageStore<I, M> messageStore =
partitionMessageStores.get(partitionId);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
index 0732079..e1e7a3f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
@@ -186,6 +186,11 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return map.get(partitionId).size() != 0;
+ }
+
+ @Override
public Iterable<M> getVertexMessages(I vertexId) throws IOException {
DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
if (dataInputOutput == null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index a61536f..b172d24 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -162,6 +162,11 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return map.get(partitionId).size() != 0;
+ }
+
+ @Override
public Iterable<M> getVertexMessages(
I vertexId) throws IOException {
Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index a8c19be..2fbc35c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -177,6 +177,11 @@ public class IntByteArrayMessageStore<M extends Writable>
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return !map.get(partitionId).isEmpty();
+ }
+
+ @Override
public Iterable<M> getVertexMessages(
IntWritable vertexId) throws IOException {
DataInputOutput dataInputOutput =
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 7a4ed09..3186224 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -138,6 +138,11 @@ public class IntFloatMessageStore
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return !map.get(partitionId).isEmpty();
+ }
+
+ @Override
public Iterable<FloatWritable> getVertexMessages(
IntWritable vertexId) throws IOException {
Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 069face..6278c16 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -139,6 +139,11 @@ public class LongDoubleMessageStore
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return !map.get(partitionId).isEmpty();
+ }
+
+ @Override
public Iterable<DoubleWritable> getVertexMessages(
LongWritable vertexId) throws IOException {
Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
index 50e8818..d8e5246 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
@@ -101,6 +101,11 @@ public abstract class LongAbstractMessageStore<M extends Writable,
T>
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return !map.get(partitionId).isEmpty();
+ }
+
+ @Override
public void clearVertexMessages(LongWritable vertexId) throws IOException {
getPartitionMap(vertexId).remove(vertexId.get());
}
@@ -124,5 +129,4 @@ public abstract class LongAbstractMessageStore<M extends Writable,
T>
}
return vertices;
}
-
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
index 252ee39..e820f26 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -142,6 +142,11 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
}
@Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ return store.hasMessagesForPartition(partitionId);
+ }
+
+ @Override
public void addPartitionMessages(
int partitionId, VertexIdMessages<I, M> messages) throws IOException {
int hash = partition2Queue.get(partitionId);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 1cd1bd6..e9b072a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -210,7 +210,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
Partition<I, V, E> partition) {
final int partitionId = partition.getId();
MessageStore<I, Writable> messageStore =
- serverData.getCurrentMessageStore();
+ serverData.getPartitionStore().getCurrentMessageStore();
ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
new ByteArrayVertexIdMessages<I, Writable>(
configuration.createOutgoingMessageValueFactory());
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 37fb246..600cb1a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -21,11 +21,8 @@ package org.apache.giraph.comm.netty;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -33,8 +30,6 @@ import org.apache.log4j.Logger;
import java.net.InetSocketAddress;
-import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
-
/**
* Netty worker server that implement {@link WorkerServer} and contains
* the actual {@link ServerData}.
@@ -78,8 +73,7 @@ public class NettyWorkerServer<I extends WritableComparable,
this.context = context;
serverData =
- new ServerData<I, V, E>(service, conf, createMessageStoreFactory(),
- context);
+ new ServerData<I, V, E>(service, conf, context);
nettyServer = new NettyServer(conf,
new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
@@ -87,24 +81,6 @@ public class NettyWorkerServer<I extends WritableComparable,
nettyServer.start();
}
- /**
- * Decide which message store should be used for current application,
- * and create the factory for that store
- *
- * @return Message store factory
- */
- private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
- createMessageStoreFactory() {
- Class<? extends MessageStoreFactory> messageStoreFactoryClass =
- MESSAGE_STORE_FACTORY_CLASS.get(conf);
-
- MessageStoreFactory messageStoreFactoryInstance =
- ReflectionUtils.newInstance(messageStoreFactoryClass);
- messageStoreFactoryInstance.initialize(service, conf);
-
- return messageStoreFactoryInstance;
- }
-
@Override
public InetSocketAddress getMyAddress() {
return nettyServer.getMyAddress();
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
index b59d0cf..ab66aa3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
@@ -86,8 +86,8 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
@Override
public void doRequest(ServerData<I, V, E> serverData) {
try {
- serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId,
- vertexIdMessageMap);
+ serverData.getPartitionStore()
+ .addPartitionCurrentMessages(partitionId, vertexIdMessageMap);
} catch (IOException e) {
throw new RuntimeException("doRequest: Got IOException ", e);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index 6953998..e9be327 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -71,8 +71,8 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
while (iterator.hasNext()) {
iterator.next();
try {
- serverData.getIncomingMessageStore().
- addPartitionMessages(iterator.getCurrentFirst(),
+ serverData.getPartitionStore().
+ addPartitionIncomingMessages(iterator.getCurrentFirst(),
iterator.getCurrentSecond());
} catch (IOException e) {
throw new RuntimeException("doRequest: Got IOException ", e);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
index f8d0473..aeb1b1d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
@@ -27,7 +27,6 @@ import java.util.Map.Entry;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
@@ -94,10 +93,11 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
@Override
public void doRequest(ServerData serverData) {
try {
- MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
- if (messageStore.isPointerListEncoding()) {
+ if (serverData.getPartitionStore().getIncomingMessageStore()
+ .isPointerListEncoding()) {
// if message store is pointer list based then send data as is
- messageStore.addPartitionMessages(-1, oneMessageToManyIds);
+ serverData.getPartitionStore()
+ .addPartitionIncomingMessages(-1, oneMessageToManyIds);
} else { // else split the data per partition and send individually
CentralizedServiceWorker<I, ?, ?> serviceWorker =
serverData.getServiceWorker();
@@ -144,8 +144,9 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
partitionIdMsgs.entrySet()) {
if (!idMsgs.getValue().isEmpty()) {
- serverData.getIncomingMessageStore().addPartitionMessages(
- idMsgs.getKey(), idMsgs.getValue());
+ serverData.getPartitionStore()
+ .addPartitionIncomingMessages(idMsgs.getKey(),
+ idMsgs.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 923e427..fa268da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -103,6 +103,7 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
/**
* Constructor
+ *
* @param context Context
* @param graphState Current graph state (use to create own graph state)
* @param messageStore Message store
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 0844858..3c09957 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -332,7 +332,8 @@ end[PURE_YARN]*/
prepareForSuperstep(graphState);
context.progress();
MessageStore<I, Writable> messageStore =
- serviceWorker.getServerData().getCurrentMessageStore();
+ serviceWorker.getServerData().getPartitionStore()
+ .getCurrentMessageStore();
int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
int numThreads = Math.min(numComputeThreads, numPartitions);
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
index 8d3cab6..749e41e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
@@ -79,15 +79,21 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
// ---- OOC Commands ----
/**
* List of partitions that are on disk, and their loaded *vertices* during
- * INPUT_SUPERSTEP are ready to flush to disk
+ * INPUT_SUPERSTEP are ready to be flushed to disk
*/
private final BlockingQueue<Integer> partitionsWithInputVertices;
/**
* List of partitions that are on disk, and their loaded *edges* during
- * INPUT_SUPERSTEP are ready to flush to disk
+ * INPUT_SUPERSTEP are ready to be flushed to disk
*/
private final BlockingQueue<Integer> partitionsWithInputEdges;
- /** Number of partitions to be written to the disk */
+ /**
+ * List of partitions that are on disk, and their message buffers (either
+ * messages for current superstep, or incoming messages for next superstep)
+ * are ready to be flushed to disk
+ */
+ private final BlockingQueue<Integer> partitionsWithPendingMessages;
+ /** Number of partitions to be written to disk */
private final AtomicInteger numPartitionsToSpill;
/** Executor service for check memory thread */
@@ -128,6 +134,7 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
this.done = false;
this.partitionsWithInputVertices = new ArrayBlockingQueue<Integer>(100);
this.partitionsWithInputEdges = new ArrayBlockingQueue<Integer>(100);
+ this.partitionsWithPendingMessages = new ArrayBlockingQueue<Integer>(100);
this.numPartitionsToSpill = new AtomicInteger(0);
}
@@ -217,6 +224,13 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
}
/**
+ * @return list of partitions that have large enough message buffers.
+ */
+ public BlockingQueue<Integer> getPartitionsWithPendingMessages() {
+ return partitionsWithPendingMessages;
+ }
+
+ /**
* @return number of partitions to spill to disk
*/
public AtomicInteger getNumPartitionsToSpill() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/6f5a457f/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
index 7f52490..2b2c990 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
@@ -232,7 +232,6 @@ public class CheckMemoryCallable<I extends WritableComparable,
freeMemory));
}
}
-
// If we have enough memory, we roll back the latest shrink in number of
// partition slots.
// If we do not have enough memory, but we are not in a bad scenario
@@ -265,9 +264,10 @@ public class CheckMemoryCallable<I extends WritableComparable,
oocEngine.getPartitionsWithInputVertices();
BlockingQueue<Integer> partitionsWithInputEdges =
oocEngine.getPartitionsWithInputEdges();
+ BlockingQueue<Integer> partitionsWithPendingMessages =
+ oocEngine.getPartitionsWithPendingMessages();
AtomicInteger numPartitionsToSpill =
oocEngine.getNumPartitionsToSpill();
-
while (freeMemory < midFreeMemoryFraction * maxMemory) {
// Offload input vertex buffer of OOC partitions if we are in
// INPUT_SUPERSTEP
@@ -276,7 +276,7 @@ public class CheckMemoryCallable<I extends WritableComparable,
// vertex buffers of that partition).
PairList<Integer, Integer> pairs =
partitionStore.getOocPartitionIdsWithPendingInputVertices();
- freeMemory -= createCommand(pairs, partitionsWithInputVertices);
+ freeMemory -= createCommands(pairs, partitionsWithInputVertices);
}
// Offload edge store of OOC partitions if we are in INPUT_SUPERSTEP
@@ -284,7 +284,15 @@ public class CheckMemoryCallable<I extends WritableComparable,
serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) {
PairList<Integer, Integer> pairs =
partitionStore.getOocPartitionIdsWithPendingInputEdges();
- freeMemory -= createCommand(pairs, partitionsWithInputEdges);
+ freeMemory -= createCommands(pairs, partitionsWithInputEdges);
+ }
+
+ // Offload message buffers of OOC partitions if we are still low in
+ // free memory
+ if (freeMemory < midFreeMemoryFraction * maxMemory) {
+ PairList<Integer, Integer> pairs =
+ partitionStore.getOocPartitionIdsWithPendingMessages();
+ freeMemory -= createCommands(pairs, partitionsWithPendingMessages);
}
// Offload partitions if we are still low in free memory
@@ -295,14 +303,16 @@ public class CheckMemoryCallable<I extends WritableComparable,
if (!partitionsWithInputVertices.isEmpty() ||
!partitionsWithInputEdges.isEmpty() ||
+ !partitionsWithPendingMessages.isEmpty() ||
numPartitionsToSpill.get() != 0) {
if (LOG.isInfoEnabled()) {
LOG.info("call: signal out-of-core processor threads to start " +
- "offloading. These threads will spill vertex buffer of " +
+ "offloading. These threads will spill vertex buffers of " +
partitionsWithInputVertices.size() + " partitions, edge " +
"buffers of " + partitionsWithInputEdges.size() +
- " partitions, and " + numPartitionsToSpill.get() + " whole " +
- "partition");
+ " partitions, pending message buffers of " +
+ partitionsWithPendingMessages.size() + " partitions, and " +
+ numPartitionsToSpill.get() + " whole partitions");
}
// Opening the gate for OOC processing threads to start spilling
// data on disk
@@ -356,7 +366,7 @@ public class CheckMemoryCallable<I extends WritableComparable,
if (LOG.isInfoEnabled()) {
LOG.info("call: " +
(gcDone ?
- ("GC is done. " + String.format("GC time = %.2f sec.",
+ ("GC is done. " + String.format("GC time = %.2f sec. ",
(System.currentTimeMillis() - gcStartTime) / 1000.0)) :
"") +
"Finished offloading data to disk. " +
@@ -432,18 +442,20 @@ public class CheckMemoryCallable<I extends WritableComparable,
}
/**
- * Generates the command for a particular type of data we want to offload to
- * disk.
+ * Generate commands for out-of-core processor threads based on the
+ * (partitionId, memory foot-print) pairs we have on a particular type of data
+ * (either vertex buffer, edge buffer, or message buffer).
*
- * @param pairs list of pair(partitionId, approximate foot-print that is going
- * of be reduced by offloading the particular data of a
+ * @param pairs list of pairs (partitionId, estimated memory foot-print that
+ * is going to be reduced by offloading the particular data of a
* partition)
- * @param commands list of partitionIds for which we want to execute the
- * command
+ * @param commands commands to generate for out-of-core processor threads. a
+ * command is a partition id, for which the appropriate data
+ * should be flushed to disk.
* @return approximate amount of memory (in MB) that is going to be freed up
* after executing the generated commands
*/
- private double createCommand(PairList<Integer, Integer> pairs,
+ private double createCommands(PairList<Integer, Integer> pairs,
BlockingQueue<Integer> commands) {
double usedMemory = 0;
if (pairs.getSize() != 0) {
|