giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Denis Dudinski <denis.dudin...@gmail.com>
Subject Re: Computation hangs when Combiner is set and OOC is enabled
Date Tue, 15 Nov 2016 09:28:22 GMT
Hi Hassan,

Thank you very much for the quick response!

Can increasing the number of partitions really help in this case? All
partition data needed for calculations is already offloaded to disk, so
there are no partitions in memory. OneMessagePerVertexStore contains
vertexes for ALL partitions in memory (so it's size is similar to combined
sizes of all offloaded partitions in our case, vertex id is long and value
is double) and is not going to offload them with the increase of partitions
number.

Am I missing some point?

We also tried to run computation with the same settings but* without
combiner* and the computation got stuck too with the same symptoms. In this
case I think the reason may be in inconsistency between settings
giraph.minPartitionsPerComputeThread (150 in our case) and
giraph.flushBufferSize (8 MB by default). It seems that
org.apache.giraph.ooc.data.DiskBackedDataStore#getCandidateBuffersToOffload
can't choose any of partitions for offloading since their estimated buffer
sizes are below 8 MB.
Maybe the buffer data size estimation algorithm for each partition buffer
is too optimistic. According to heap dump one of partitions estimated size
in "left" MutablePair fiels is equal to 3679863 bytes (roughly 3,5 MB)
while heap dump shows for MutablePair's "right" field retained size of
25 444 296 (right java.util.ArrayList @ 0x551038578 25 444 296), which is
roughly 24 MB. It is almost 7 times bigger than estimated. Total size of
retained heap for org.apache.giraph.ooc.data.DiskBackedMessageStore is
equal to 5 812 238 448 bytes.

I think one more thing worth mentioning is our usage scenario. While we are
interested in quick computation, what is far more important to us is the
computation liveness. We can wait for the computation to complete for
several days or even week or two, if needed, but we must be sure it can
complete.

One more observation (maybe little bit offtopic): should we really have
reference to giraph configuration in each vertex instance? As I understand
config is immutable at times when it is injected into vertexes. Ref to
config consumes 8 bytes (x64) of memory for each object. Maybe the ref can
be replaced with static call to some static or thread local config holder
object?

Thank you once more!

Best Regards,
Denis Dudinski

2016-11-14 23:57 GMT+03:00 Hassan Eslami <hsn.eslami@gmail.com>:

> Hi Denis,
>
> Thanks for trying out the new OOC design. As you mentioned, by using
> message combiner we only keep one message for each vertex at all time. That
> means the storage needed for messages is #ofVertices*(SizeOfVertexId +
> SizeOfOneMessage). We studied several applications that use message
> combiner and noticed that in those applications message type is rather
> simple (usually a double or a pair of double or similar types). The vertex
> ID is, also, usually simple (a long value in most cases). That means per
> vertex we are keeping only 16-20 bytes for its message. We could offload
> (and in fact, it is very simple to) offload messages in case of having
> message combiners, but, we noticed that we could achieve a much better
> performance if we do not do so. In other words, it was intentional to not
> offload messages when message combiner is used. Although, message flow
> control is still in effect (and much needed) even with message combiners.
>
> I do not suggest you disable message combiner, as it further reduces the
> performance. Rather, *I suggest you increase the number of partitions per
> machine*. If you still see the issue (meaning that the RAM is too small
> to even hold one message per vertex), you can create a JIRA, mention
> exactly the numbers you have in your case (e.g., size of message, size of
> vertex id, size of RAM so there is a justification for the feature) and
> assign it to me, so that I add the option to offload the messages even when
> combiners are used.
>
> Also, I encourage you to not use the "isStaticGraph" option until it is
> completely fixed and tested.
>
> Best,
> Hassan
>
> On Mon, Nov 14, 2016 at 9:44 AM, Denis Dudinski <denis.dudinski@gmail.com>
> wrote:
>
>> Hello,
>>
>> We are using OutOofCore functionality to perform computations over
>> huge graph (billions of nodes).
>>
>> Recently we have faced a situation when all our workers stuck doing
>> nothing except performing System.gc() triggered from Giraph's
>> ThresholdBasedOracle. The intriguing point was that no memory was
>> freed at all at each gc. At the same time our memory consumption level
>> was above highMemoryPressure and all commands that Oracle could give
>> to IO scheduler were STORE_MESSAGES_AND_BUFFERS and STORE_PARTITION.
>> However, there was NO partitions, messages or buffers available for
>> offloading.
>>
>> We looked into state of the MetaPartitionManager and discovered that
>> according to state matrix within it all unprocessed partitions are
>> already spilled to disk as well as their messages. But there were no
>> data for messages stored on disk. A little bit more struggle and we
>> discovered that our RAM space was almost entirely consumed by incoming
>> messages placed in OneMessagePerVertexStore instance. Then we looked
>> into DiskBackedMessageStore and found out that it just don't offloads
>> any incoming message data when we use message combiner (please see
>> org.apache.giraph.ooc.data.DiskBackedMessageStore#offloadPartitionData
>> and org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand).
>>
>> This situation can be reproduced easily using big enough graph and two
>> workers with small amount of RAM and OOC enabled (and configured
>> properly). Even with combiner, which leaves only one message per
>> vertex, number of partitions and vertices can be too big to hold
>> incoming message data entirely in memory.
>>
>> Can we somehow work around such limitation and NOT disable Combiner?
>>
>> Our test computation config looks like this:
>>
>> hadoop jar /opt/giraph-1.2.0/pr-job-jar-with-dependencies.jar
>> org.apache.giraph.GiraphRunner com.prototype.di.pr.PageRankComputation
>> \
>> -mc com.prototype.di.pr.PageRankMasterCompute \
>> -yj pr-job-jar-with-dependencies.jar \
>> -vif com.prototype.di.pr.input.HBLongVertexInputFormat \
>> -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \
>> -op /user/hadoop/output/pr_test \
>> -w 2 \
>> -c com.prototype.di.pr.PRDoubleCombiner \
>> -wc com.prototype.di.pr.PageRankWorkerContext \
>> -ca hbase.rootdir=hdfs://namenode1.testcluster.com:8020/hbase \
>> -ca giraph.logLevel=info \
>> -ca hbase.mapreduce.inputtable=di_test \
>> -ca hbase.mapreduce.scan.columns=di:n \
>> -ca hbase.defaults.for.version.skip=true \
>> -ca hbase.table.row.textkey=false \
>> -ca giraph.yarn.task.heap.mb=10000 \
>> -ca giraph.isStaticGraph=true \
>> -ca giraph.SplitMasterWorker=false \
>> -ca giraph.oneToAllMsgSending=true \
>> -ca giraph.metrics.enable=false \
>> -ca giraph.jmap.histo.enable=false \
>> -ca giraph.vertexIdClass=com.prototype.di.pr.DomainPartAwareLongWritable
>> \
>> -ca giraph.outgoingMessageValueClass=org.apache.hadoop.io.DoubleWritable
>> \
>> -ca giraph.addDebugOpt=true \
>> -ca giraph.useOutOfCoreGraph=true \
>> -ca giraph.waitForPerWorkerRequests=true \
>> -ca giraph.maxNumberOfUnsentRequests=1000 \
>> -ca giraph.vertexInputFilterClass=com.prototype.di.pr.input.Page
>> sFromSameDomainLimiter
>> \
>> -ca giraph.pr.di.maxPagesFromSameDomain=-1 \
>> -ca giraph.useInputSplitLocality=true \
>> -ca hbase.mapreduce.scan.cachedrows=1000 \
>> -ca giraph.minPartitionsPerComputeThread=150 \
>> -ca giraph.graphPartitionerFactoryClass=com.prototype.di.pr.Doma
>> inAwareGraphPartitionerFactory
>> \
>> -ca giraph.numInputThreads=1 \
>> -ca giraph.inputSplitSamplePercent=1 \
>> -ca giraph.pr.maxNeighborsPerVertex=256 \
>> -ca giraph.partitionClass=org.apache.giraph.partition.ByteArrayPartition
>> \
>> -ca giraph.vertexClass=org.apache.giraph.graph.ByteValueVertex \
>> -ca giraph.inputOutEdgesClass=org.apache.giraph.edge.LongNullArrayEdges \
>> -ca giraph.numComputeThreads=2 \
>> -ca giraph.memory.failPressure=0.6 \
>> -ca giraph.memory.emergencyPressure=0.575 \
>> -ca giraph.memory.highPressure=0.55 \
>> -ca giraph.memory.optimalPressure=0.525 \
>> -ca giraph.memory.lowPressure=0.5
>>
>> Thank you in advance.
>>
>> Best Regards,
>> Denis Dudinski
>>
>
>

Mime
View raw message