giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Avery Ching <ach...@apache.org>
Subject Re: Review Request: Out-of-core messages
Date Fri, 20 Jul 2012 23:43:07 GMT
Thanks everyone for caring so much about this!  In my opinion both this 
and the out-of-core vertices are very important improvements.  Some 
algorithms send large messages around or multiple messages and this can 
dominate the memory usage.  Some algorithms have a giant graph that 
dominate the memory usage.  This patch seems to be a really good first 
step in trying to address the messages part and can be turned on/off 
(off by default I think).  We can iterate further as time proceeds.

When I ran the PageRankBenchmark for the Hadoop Summit results, I saw 
that messages were the primary issue.  Each vertex had 10 edges, but one 
vertex value.  I was sending 10 messages per vertex, each with a vertex 
value, so it made sense that the message memory dominates in this case.

Avery

On 7/20/12 1:42 PM, Eli Reisman wrote:
> That makes a ton of sense, great ideas. What we are seeing in the metrics
> is the three-way load of
>
> 1. reading InputSplits from HDFS (mostly over the wire as there is no
> locality right now)
> 2. creating temporary collections of vertices, sending them on netty
> 3. simultaneously receiving collections of vertices on netty from remote
> nodes that will be place in the local workers' partitions for processing
> stages
>
> is what overloads us on INPUT_SUPERSTEP. once the calculations begin, there
> are a lot less collections of vertices going out on the wire, even under
> dynamic repartitioning so all we see is messages on the wire, and no more
> InputSplit memory wasted. The usage even under load drops way down for the
> rest of the run, and a huge proportion of my total runtimes are spent in
> INPUT_SUPERSTEP.
>
> Everything you said seems valid in my opinion, but that does assume memory
> is the issue during calculation.
>
> What we see here is: the super steps in memory don't seem to overload us
> (testing on more algorithms would help confirm this) and because they are
> in memory and run fast they also don't seem to take much time. Even local
> disk IO is slow and has overhead of its own.
>
> I thought everything you guys do, until I saw Jakob's metrics patch reveal
> whats happening during a run in such detail. It still surprises me. But I
> can tell you the results here have been awfully consistent for us over
> varying loads.
>
> On the other hand, this could scale to be a memory problem in the future. I
> think my vote here is add this stuff as the need arises, and that comes
> from trying to help new users try Giraph here, and having them complain
> about stability and getting their code to behave as it should under
> different loads, different # of workers.
>
> So my instinct right now based on that feedback is more simplicity, and
> less moving parts where possible. One could easily argue its too soon for
> that and now is the time to experiment with the codebase and make
> productive discoveries too! ;)
>
> On Fri, Jul 20, 2012 at 1:20 PM, Claudio Martella <
> claudio.martella@gmail.com> wrote:
>
>> Nice discussion.
>>
>> I'm totally with Maja on this one. The problem with the
>> INPUT_SUPERSTEP is connected with the other JIRA where the graph is
>> spilled to disk, and I personally don't see how messages should affect
>> it. In fact I believe the problem with the INPUT_SUPERSTEP happens
>> with big graphs, mostly when keeping the graph in memory (nearly)
>> fills the memory available.
>> Also, the impact of out-of-core messages really depends on the
>> algorithms and its semantics, wrt to message creation.
>> I had some algorithms that passed the INPUT_SUPERSTEP smoothly, but
>> failed in the upcoming supersteps due to the messages (e.g. with a
>> very dense graph).
>>
>> That being said, I agree tests should be run systematically. When I
>> designed the out-of-core messages strategy presented in the JIRA and
>> implemented by Maja, it was meant (and still is) to be part of a
>> paper. For this reason I'm pretty happy to start discussing a
>> testing/benchmarking strategy.
>>
>> 1) we should run tests on two algorithms, e.g. pagerank and SSSP. They
>> are the algorithms used in the original Pregel paper and they are two
>> very different algorithms as far as messaging patterns are concerned.
>>
>> 2) we should run tests with a fixed size graph, with varying threshold
>> (from 0% to 100% out-of-core messages). We know the amount of messages
>> being sent per superstep by each worker, that should give us the
>> baseline.
>> This should show the cost of IO. I expect the cost, in time, to be
>> mostly depending on the write and read performance of disks, as we're
>> reading and writing sequentially, plus the cost of sorting. We should
>> show that.
>>
>> 3) The number of workers should not affect the cost of out-of-core
>> messages. The only intuitive advantage is that by fixing the graph
>> size and by increasing the number of workers, less vertices are
>> assigned to each worker and therefore less messages are spillt to disk
>> per worker. This is pretty intuitive and shouldn't really require
>> intensive testing.
>>
>> We could use the graph generator which has been contributed recently
>> as an input graph.
>>
>> What do you guys think?
>>
>>
>> On Fri, Jul 20, 2012 at 3:44 PM, Maja Kabiljo <majakabiljo@fb.com> wrote:
>>>
>>>> On July 19, 2012, 5:02 p.m., Eli Reisman wrote:
>>>>> Hi Maja,
>>>>>
>>>>> This was a lot of really hard work, great job. My general discomfort
>> with adopting this too quickly is this is a big change that adds a lot of
>> new moving parts, and needs to be extensively tested in two ways:
>>>>> 1. On real clusters, with varying data loads. Testing in pseudo mode
>> for a change this big doesn't tell us if it helps or hurts us. This does
>> involve (potentially) a lot of IO which adds overhead.
>>>>> 2. Tested on algorithms that mutate the graph during compute() super
>> steps so that we can objectively measure whats going on when that case
>> comes up.
>>>>> My main point: I would be a lot more comfortable with this and the
>> other patch spilling to disk for partitions (also really great code) if
>> anyone writing these was doing some metrics and was addressing the fact
>> that we are not having a memory problem at very acceptable levels of
>> scale-out during any time but INPUT_SUPERSTEP. If we're not focused on
>> that, we are fixing a lot of stuff that has not proven to be broken yet.
>> The metrics all clearly show during real cluster jobs on a wide variety of
>> data load sizes that the critical moment is during super step -1 when the
>> data is loaded and collections of vertices are sent out over Netty to their
>> remote homes (partitions) in preparation for the calculation super steps.
>>>>> A broad fix to this would include placing the mappers/workers on
>> cluster nodes that a replica of the data they read is stored at (as in
>> Hadoop, restoring locality) or to do the spills to disk during this phase
>> only, when its easy since no processing is going on and they can easily be
>> re-loaded when the splits are done and the memory pressure has receded. For
>> the rest of the processing cycle, they should be fine. As we scale out
>> further under the same memory constraints, we could add more creative
>> spilling techniques if needed and once the INPUT_SUPERSTEP stuff was proven
>> stable. Don't mean to rain on the parade but this really seems like a
>> sensible way to go forward?
>>>>> Regardless, everyone working on disk spill code as done really fine
>> work, I admire it. If we adapt it to rein in the scope a bit, or back these
>> ideas with some realistic testing/metrics, I'll be your biggest supporter!
>>> Hi Eli,
>>>
>>> Thanks a lot for looking into this.
>>>
>>> I totally agree that I need to do more testing and measurements for this
>> solution, that was my plan from the beginning. I uploaded the patch now
>> since it's big and I wanted to parallelise doing benchmarking and getting
>> some comments about it. Sorry if that's the wrong way of doing it.
>>> I think whether or not messages are going to cause memory problem
>> strongly depends on the algorithm which is run. The way I see it, there are
>> cases in which you won't have problems with graph (not even during input
>> superstep) but you'll have them with messages. I do understand what you are
>> saying, Alessandro is already trying to address input superstep problem,
>> but I think this thing is also good to have.
>>> Note that out-of-core messaging is implemented as an option, i.e. users
>> can still choose not to use it, and in that case the way the whole system
>> works haven't changed much (in terms of performance).
>>> As for 2., graph mutations don't have big influence on messaging, apart
>> from creation of nonexistent vertices which received messages. Am I missing
>> something?
>>> I'll be working on providing some metrics and will update when I do so.
>>>
>>>
>>> - Maja
>>>
>>>
>>> -----------------------------------------------------------
>>> This is an automatically generated e-mail. To reply, visit:
>>> https://reviews.apache.org/r/6013/#review9285
>>> -----------------------------------------------------------
>>>
>>>
>>> On July 19, 2012, 12:09 p.m., Maja Kabiljo wrote:
>>>> -----------------------------------------------------------
>>>> This is an automatically generated e-mail. To reply, visit:
>>>> https://reviews.apache.org/r/6013/
>>>> -----------------------------------------------------------
>>>>
>>>> (Updated July 19, 2012, 12:09 p.m.)
>>>>
>>>>
>>>> Review request for giraph.
>>>>
>>>>
>>>> Description
>>>> -------
>>>>
>>>> This patch introduces out-of-core messages support for Giraph. Some
>> ideas are taken from discussion in
>> https://issues.apache.org/jira/browse/GIRAPH-45.
>>>> We keep messages in MessageStore, from which they will be flushed to
>> disk when necessary. Messages are separated by partition. In moments we
>> only flush single partition to disk so we would still keep things in memory
>> in case it's time for next superstep. When flushing to disk, we write in
>> the following format:
>>>> numberOfVertices
>>>> vertexId1 numberOfMessages1 messagesForVertexId1
>>>> vertexId2 numberOfMessages2 messagesForVertexId2
>>>> ...
>>>> Vertex ids are sorted. We don't require enough memory to fit all the
>> messages for partition, but we require that messages for a single vertex
>> fit in memory.
>>>> In the end we potentially have several files for each partition. When
>> reading messages, all the files are read sequentially.
>>>> DiskBackedMessageStoreByPartition handles all messages,
>> DiskBackedMessageStore is then used for a single partition, and
>> SequentialFileMessageStore handles single file.
>>>> There is also SimpleMessageStore which doesn't use disk at all.
>>>>
>>>> Options available to user:
>>>> - whether or not to use out-of-core messaging
>>>> - number of messages to keep in memory - this should probably be
>> changed (explained below)
>>>> - size of buffer when reading from and writing to disk
>>>>
>>>> ServerData now has two instances of message stores: one which is
>> consumed in current superstep with messages from previous superstep, and
>> one in which it will keep incoming messages for next superstep.
>>>> Other things which had to be changed:
>>>> - Checkpointing - since messages are not kept in the vertex anymore,
>> they need to be stored separately.
>>>> - Partition exchange between workers - same reasons as above - added
>> SendMessagesRequest
>>>> - Messages are not assigned to vertex, they are just passed in compute
>>>> - compute methods are now executed in order of vertex id inside of
>> partition, so we could have fast reading from disk
>>>> For memory check I only have the number of messages which I allow in
>> memory. This should be done better, but there is a problem since
>> Alessandro's patch for out-of-core graph also has memory checks. We don't
>> want one of those parts to use all the memory and leave too little space
>> for the other, but I'm not aware of a way to separately check memory usage
>> of different data structures.
>>>> I didn't integrate this with RPC, that's why there are some checks for
>> useNetty, those can be removed once the RPC is removed. Also, since vertex
>> doesn't keep messages in itself anymore, once RPC is removed we should also
>> remove getMessages/putMessages/getNumMessages from vertex, change
>> initialize to (id, value, edges, hasMessages) and just give messages to
>> vertex when calling compute.
>>>> I'll fix the part when partitions are sent around before superstep,
>> since that's the only part now which requires that all the messages for
>> single partition fit in memory.
>>>>
>>>> This addresses bug GIRAPH-45.
>>>>      https://issues.apache.org/jira/browse/GIRAPH-45
>>>>
>>>>
>>>> Diffs
>>>> -----
>>>>
>>>>
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendMessagesRequest.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java1363291
>> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.javaPRE-CREATION
>> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java1363291
>>>> Diff: https://reviews.apache.org/r/6013/diff/
>>>>
>>>>
>>>> Testing
>>>> -------
>>>>
>>>> Run mvn verify and tests in pseudo-distributed mode, all apart from
>> this one https://issues.apache.org/jira/browse/GIRAPH-259 pass.
>>>>
>>>> Thanks,
>>>>
>>>> Maja Kabiljo
>>>>
>>>>
>>
>>
>> --
>>     Claudio Martella
>>     claudio.martella@gmail.com
>>


Mime
View raw message