Hi Denis,

Thanks for trying out the new OOC design. As you mentioned, by using message combiner we only keep one message for each vertex at all time. That means the storage needed for messages is #ofVertices*(SizeOfVertexId + SizeOfOneMessage). We studied several applications that use message combiner and noticed that in those applications message type is rather simple (usually a double or a pair of double or similar types). The vertex ID is, also, usually simple (a long value in most cases). That means per vertex we are keeping only 16-20 bytes for its message. We could offload (and in fact, it is very simple to) offload messages in case of having message combiners, but, we noticed that we could achieve a much better performance if we do not do so. In other words, it was intentional to not offload messages when message combiner is used. Although, message flow control is still in effect (and much needed) even with message combiners.

I do not suggest you disable message combiner, as it further reduces the performance. Rather, I suggest you increase the number of partitions per machine. If you still see the issue (meaning that the RAM is too small to even hold one message per vertex), you can create a JIRA, mention exactly the numbers you have in your case (e.g., size of message, size of vertex id, size of RAM so there is a justification for the feature) and assign it to me, so that I add the option to offload the messages even when combiners are used.

Also, I encourage you to not use the "isStaticGraph" option until it is completely fixed and tested.

Best,
Hassan

On Mon, Nov 14, 2016 at 9:44 AM, Denis Dudinski <denis.dudinski@gmail.com> wrote:
Hello,

We are using OutOofCore functionality to perform computations over
huge graph (billions of nodes).

Recently we have faced a situation when all our workers stuck doing
nothing except performing System.gc() triggered from Giraph's
ThresholdBasedOracle. The intriguing point was that no memory was
freed at all at each gc. At the same time our memory consumption level
was above highMemoryPressure and all commands that Oracle could give
to IO scheduler were STORE_MESSAGES_AND_BUFFERS and STORE_PARTITION.
However, there was NO partitions, messages or buffers available for
offloading.

We looked into state of the MetaPartitionManager and discovered that
according to state matrix within it all unprocessed partitions are
already spilled to disk as well as their messages. But there were no
data for messages stored on disk. A little bit more struggle and we
discovered that our RAM space was almost entirely consumed by incoming
messages placed in OneMessagePerVertexStore instance. Then we looked
into DiskBackedMessageStore and found out that it just don't offloads
any incoming message data when we use message combiner (please see
org.apache.giraph.ooc.data.DiskBackedMessageStore#offloadPartitionData
and org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand).

This situation can be reproduced easily using big enough graph and two
workers with small amount of RAM and OOC enabled (and configured
properly). Even with combiner, which leaves only one message per
vertex, number of partitions and vertices can be too big to hold
incoming message data entirely in memory.

Can we somehow work around such limitation and NOT disable Combiner?

Our test computation config looks like this:

hadoop jar /opt/giraph-1.2.0/pr-job-jar-with-dependencies.jar
org.apache.giraph.GiraphRunner com.prototype.di.pr.PageRankComputation
\
-mc com.prototype.di.pr.PageRankMasterCompute \
-yj pr-job-jar-with-dependencies.jar \
-vif com.prototype.di.pr.input.HBLongVertexInputFormat \
-vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \
-op /user/hadoop/output/pr_test \
-w 2 \
-c com.prototype.di.pr.PRDoubleCombiner \
-wc com.prototype.di.pr.PageRankWorkerContext \
-ca hbase.rootdir=hdfs://namenode1.testcluster.com:8020/hbase \
-ca giraph.logLevel=info \
-ca hbase.mapreduce.inputtable=di_test \
-ca hbase.mapreduce.scan.columns=di:n \
-ca hbase.defaults.for.version.skip=true \
-ca hbase.table.row.textkey=false \
-ca giraph.yarn.task.heap.mb=10000 \
-ca giraph.isStaticGraph=true \
-ca giraph.SplitMasterWorker=false \
-ca giraph.oneToAllMsgSending=true \
-ca giraph.metrics.enable=false \
-ca giraph.jmap.histo.enable=false \
-ca giraph.vertexIdClass=com.prototype.di.pr.DomainPartAwareLongWritable \
-ca giraph.outgoingMessageValueClass=org.apache.hadoop.io.DoubleWritable \
-ca giraph.addDebugOpt=true \
-ca giraph.useOutOfCoreGraph=true \
-ca giraph.waitForPerWorkerRequests=true \
-ca giraph.maxNumberOfUnsentRequests=1000 \
-ca giraph.vertexInputFilterClass=com.prototype.di.pr.input.PagesFromSameDomainLimiter
\
-ca giraph.pr.di.maxPagesFromSameDomain=-1 \
-ca giraph.useInputSplitLocality=true \
-ca hbase.mapreduce.scan.cachedrows=1000 \
-ca giraph.minPartitionsPerComputeThread=150 \
-ca giraph.graphPartitionerFactoryClass=com.prototype.di.pr.DomainAwareGraphPartitionerFactory
\
-ca giraph.numInputThreads=1 \
-ca giraph.inputSplitSamplePercent=1 \
-ca giraph.pr.maxNeighborsPerVertex=256 \
-ca giraph.partitionClass=org.apache.giraph.partition.ByteArrayPartition \
-ca giraph.vertexClass=org.apache.giraph.graph.ByteValueVertex \
-ca giraph.inputOutEdgesClass=org.apache.giraph.edge.LongNullArrayEdges \
-ca giraph.numComputeThreads=2 \
-ca giraph.memory.failPressure=0.6 \
-ca giraph.memory.emergencyPressure=0.575 \
-ca giraph.memory.highPressure=0.55 \
-ca giraph.memory.optimalPressure=0.525 \
-ca giraph.memory.lowPressure=0.5

Thank you in advance.

Best Regards,
Denis Dudinski