giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claudio Martella <claudio.marte...@gmail.com>
Subject Re: Review Request: Out-of-core messages
Date Sat, 21 Jul 2012 13:04:33 GMT
To be honest, I don't see the connection between the problem with the
INPUT_SUPERSTEP and this patch, and the reason why we shouldn't
proceed in parallel. The former has to be improved and I welcome Eli's
work on that. Also, I still don't understand exactly the issue there.
This patch on the other side does help new people get to giraph as
only a very few users have big clusters that spawn mappers with > 2GB
heap.
Having the possibility to run giraph on small cluster by going
out-of-core will help new users adopt Giraph, as we've seen this
request popping up very often on the mailing list.
As Avery and Maja already said, this can be turned off by default,
which is the most reasonable setting to me.

On Sat, Jul 21, 2012 at 1:43 AM, Avery Ching <aching@apache.org> wrote:
> Thanks everyone for caring so much about this!  In my opinion both this and
> the out-of-core vertices are very important improvements.  Some algorithms
> send large messages around or multiple messages and this can dominate the
> memory usage.  Some algorithms have a giant graph that dominate the memory
> usage.  This patch seems to be a really good first step in trying to address
> the messages part and can be turned on/off (off by default I think).  We can
> iterate further as time proceeds.
>
> When I ran the PageRankBenchmark for the Hadoop Summit results, I saw that
> messages were the primary issue.  Each vertex had 10 edges, but one vertex
> value.  I was sending 10 messages per vertex, each with a vertex value, so
> it made sense that the message memory dominates in this case.
>
> Avery
>
>
> On 7/20/12 1:42 PM, Eli Reisman wrote:
>>
>> That makes a ton of sense, great ideas. What we are seeing in the metrics
>> is the three-way load of
>>
>> 1. reading InputSplits from HDFS (mostly over the wire as there is no
>> locality right now)
>> 2. creating temporary collections of vertices, sending them on netty
>> 3. simultaneously receiving collections of vertices on netty from remote
>> nodes that will be place in the local workers' partitions for processing
>> stages
>>
>> is what overloads us on INPUT_SUPERSTEP. once the calculations begin,
>> there
>> are a lot less collections of vertices going out on the wire, even under
>> dynamic repartitioning so all we see is messages on the wire, and no more
>> InputSplit memory wasted. The usage even under load drops way down for the
>> rest of the run, and a huge proportion of my total runtimes are spent in
>> INPUT_SUPERSTEP.
>>
>> Everything you said seems valid in my opinion, but that does assume memory
>> is the issue during calculation.
>>
>> What we see here is: the super steps in memory don't seem to overload us
>> (testing on more algorithms would help confirm this) and because they are
>> in memory and run fast they also don't seem to take much time. Even local
>> disk IO is slow and has overhead of its own.
>>
>> I thought everything you guys do, until I saw Jakob's metrics patch reveal
>> whats happening during a run in such detail. It still surprises me. But I
>> can tell you the results here have been awfully consistent for us over
>> varying loads.
>>
>> On the other hand, this could scale to be a memory problem in the future.
>> I
>> think my vote here is add this stuff as the need arises, and that comes
>> from trying to help new users try Giraph here, and having them complain
>> about stability and getting their code to behave as it should under
>> different loads, different # of workers.
>>
>> So my instinct right now based on that feedback is more simplicity, and
>> less moving parts where possible. One could easily argue its too soon for
>> that and now is the time to experiment with the codebase and make
>> productive discoveries too! ;)
>>
>> On Fri, Jul 20, 2012 at 1:20 PM, Claudio Martella <
>> claudio.martella@gmail.com> wrote:
>>
>>> Nice discussion.
>>>
>>> I'm totally with Maja on this one. The problem with the
>>> INPUT_SUPERSTEP is connected with the other JIRA where the graph is
>>> spilled to disk, and I personally don't see how messages should affect
>>> it. In fact I believe the problem with the INPUT_SUPERSTEP happens
>>> with big graphs, mostly when keeping the graph in memory (nearly)
>>> fills the memory available.
>>> Also, the impact of out-of-core messages really depends on the
>>> algorithms and its semantics, wrt to message creation.
>>> I had some algorithms that passed the INPUT_SUPERSTEP smoothly, but
>>> failed in the upcoming supersteps due to the messages (e.g. with a
>>> very dense graph).
>>>
>>> That being said, I agree tests should be run systematically. When I
>>> designed the out-of-core messages strategy presented in the JIRA and
>>> implemented by Maja, it was meant (and still is) to be part of a
>>> paper. For this reason I'm pretty happy to start discussing a
>>> testing/benchmarking strategy.
>>>
>>> 1) we should run tests on two algorithms, e.g. pagerank and SSSP. They
>>> are the algorithms used in the original Pregel paper and they are two
>>> very different algorithms as far as messaging patterns are concerned.
>>>
>>> 2) we should run tests with a fixed size graph, with varying threshold
>>> (from 0% to 100% out-of-core messages). We know the amount of messages
>>> being sent per superstep by each worker, that should give us the
>>> baseline.
>>> This should show the cost of IO. I expect the cost, in time, to be
>>> mostly depending on the write and read performance of disks, as we're
>>> reading and writing sequentially, plus the cost of sorting. We should
>>> show that.
>>>
>>> 3) The number of workers should not affect the cost of out-of-core
>>> messages. The only intuitive advantage is that by fixing the graph
>>> size and by increasing the number of workers, less vertices are
>>> assigned to each worker and therefore less messages are spillt to disk
>>> per worker. This is pretty intuitive and shouldn't really require
>>> intensive testing.
>>>
>>> We could use the graph generator which has been contributed recently
>>> as an input graph.
>>>
>>> What do you guys think?
>>>
>>>
>>> On Fri, Jul 20, 2012 at 3:44 PM, Maja Kabiljo <majakabiljo@fb.com> wrote:
>>>>
>>>>
>>>>> On July 19, 2012, 5:02 p.m., Eli Reisman wrote:
>>>>>>
>>>>>> Hi Maja,
>>>>>>
>>>>>> This was a lot of really hard work, great job. My general discomfort
>>>
>>> with adopting this too quickly is this is a big change that adds a lot of
>>> new moving parts, and needs to be extensively tested in two ways:
>>>>>>
>>>>>> 1. On real clusters, with varying data loads. Testing in pseudo mode
>>>
>>> for a change this big doesn't tell us if it helps or hurts us. This does
>>> involve (potentially) a lot of IO which adds overhead.
>>>>>>
>>>>>> 2. Tested on algorithms that mutate the graph during compute() super
>>>
>>> steps so that we can objectively measure whats going on when that case
>>> comes up.
>>>>>>
>>>>>> My main point: I would be a lot more comfortable with this and the
>>>
>>> other patch spilling to disk for partitions (also really great code) if
>>> anyone writing these was doing some metrics and was addressing the fact
>>> that we are not having a memory problem at very acceptable levels of
>>> scale-out during any time but INPUT_SUPERSTEP. If we're not focused on
>>> that, we are fixing a lot of stuff that has not proven to be broken yet.
>>> The metrics all clearly show during real cluster jobs on a wide variety
>>> of
>>> data load sizes that the critical moment is during super step -1 when the
>>> data is loaded and collections of vertices are sent out over Netty to
>>> their
>>> remote homes (partitions) in preparation for the calculation super steps.
>>>>>>
>>>>>> A broad fix to this would include placing the mappers/workers on
>>>
>>> cluster nodes that a replica of the data they read is stored at (as in
>>> Hadoop, restoring locality) or to do the spills to disk during this phase
>>> only, when its easy since no processing is going on and they can easily
>>> be
>>> re-loaded when the splits are done and the memory pressure has receded.
>>> For
>>> the rest of the processing cycle, they should be fine. As we scale out
>>> further under the same memory constraints, we could add more creative
>>> spilling techniques if needed and once the INPUT_SUPERSTEP stuff was
>>> proven
>>> stable. Don't mean to rain on the parade but this really seems like a
>>> sensible way to go forward?
>>>>>>
>>>>>> Regardless, everyone working on disk spill code as done really fine
>>>
>>> work, I admire it. If we adapt it to rein in the scope a bit, or back
>>> these
>>> ideas with some realistic testing/metrics, I'll be your biggest
>>> supporter!
>>>>
>>>> Hi Eli,
>>>>
>>>> Thanks a lot for looking into this.
>>>>
>>>> I totally agree that I need to do more testing and measurements for this
>>>
>>> solution, that was my plan from the beginning. I uploaded the patch now
>>> since it's big and I wanted to parallelise doing benchmarking and getting
>>> some comments about it. Sorry if that's the wrong way of doing it.
>>>>
>>>> I think whether or not messages are going to cause memory problem
>>>
>>> strongly depends on the algorithm which is run. The way I see it, there
>>> are
>>> cases in which you won't have problems with graph (not even during input
>>> superstep) but you'll have them with messages. I do understand what you
>>> are
>>> saying, Alessandro is already trying to address input superstep problem,
>>> but I think this thing is also good to have.
>>>>
>>>> Note that out-of-core messaging is implemented as an option, i.e. users
>>>
>>> can still choose not to use it, and in that case the way the whole system
>>> works haven't changed much (in terms of performance).
>>>>
>>>> As for 2., graph mutations don't have big influence on messaging, apart
>>>
>>> from creation of nonexistent vertices which received messages. Am I
>>> missing
>>> something?
>>>>
>>>> I'll be working on providing some metrics and will update when I do so.
>>>>
>>>>
>>>> - Maja
>>>>
>>>>
>>>> -----------------------------------------------------------
>>>> This is an automatically generated e-mail. To reply, visit:
>>>> https://reviews.apache.org/r/6013/#review9285
>>>> -----------------------------------------------------------
>>>>
>>>>
>>>> On July 19, 2012, 12:09 p.m., Maja Kabiljo wrote:
>>>>>
>>>>> -----------------------------------------------------------
>>>>> This is an automatically generated e-mail. To reply, visit:
>>>>> https://reviews.apache.org/r/6013/
>>>>> -----------------------------------------------------------
>>>>>
>>>>> (Updated July 19, 2012, 12:09 p.m.)
>>>>>
>>>>>
>>>>> Review request for giraph.
>>>>>
>>>>>
>>>>> Description
>>>>> -------
>>>>>
>>>>> This patch introduces out-of-core messages support for Giraph. Some
>>>
>>> ideas are taken from discussion in
>>> https://issues.apache.org/jira/browse/GIRAPH-45.
>>>>>
>>>>> We keep messages in MessageStore, from which they will be flushed to
>>>
>>> disk when necessary. Messages are separated by partition. In moments we
>>> only flush single partition to disk so we would still keep things in
>>> memory
>>> in case it's time for next superstep. When flushing to disk, we write in
>>> the following format:
>>>>>
>>>>> numberOfVertices
>>>>> vertexId1 numberOfMessages1 messagesForVertexId1
>>>>> vertexId2 numberOfMessages2 messagesForVertexId2
>>>>> ...
>>>>> Vertex ids are sorted. We don't require enough memory to fit all the
>>>
>>> messages for partition, but we require that messages for a single vertex
>>> fit in memory.
>>>>>
>>>>> In the end we potentially have several files for each partition. When
>>>
>>> reading messages, all the files are read sequentially.
>>>>>
>>>>> DiskBackedMessageStoreByPartition handles all messages,
>>>
>>> DiskBackedMessageStore is then used for a single partition, and
>>> SequentialFileMessageStore handles single file.
>>>>>
>>>>> There is also SimpleMessageStore which doesn't use disk at all.
>>>>>
>>>>> Options available to user:
>>>>> - whether or not to use out-of-core messaging
>>>>> - number of messages to keep in memory - this should probably be
>>>
>>> changed (explained below)
>>>>>
>>>>> - size of buffer when reading from and writing to disk
>>>>>
>>>>> ServerData now has two instances of message stores: one which is
>>>
>>> consumed in current superstep with messages from previous superstep, and
>>> one in which it will keep incoming messages for next superstep.
>>>>>
>>>>> Other things which had to be changed:
>>>>> - Checkpointing - since messages are not kept in the vertex anymore,
>>>
>>> they need to be stored separately.
>>>>>
>>>>> - Partition exchange between workers - same reasons as above - added
>>>
>>> SendMessagesRequest
>>>>>
>>>>> - Messages are not assigned to vertex, they are just passed in compute
>>>>> - compute methods are now executed in order of vertex id inside of
>>>
>>> partition, so we could have fast reading from disk
>>>>>
>>>>> For memory check I only have the number of messages which I allow in
>>>
>>> memory. This should be done better, but there is a problem since
>>> Alessandro's patch for out-of-core graph also has memory checks. We don't
>>> want one of those parts to use all the memory and leave too little space
>>> for the other, but I'm not aware of a way to separately check memory
>>> usage
>>> of different data structures.
>>>>>
>>>>> I didn't integrate this with RPC, that's why there are some checks for
>>>
>>> useNetty, those can be removed once the RPC is removed. Also, since
>>> vertex
>>> doesn't keep messages in itself anymore, once RPC is removed we should
>>> also
>>> remove getMessages/putMessages/getNumMessages from vertex, change
>>> initialize to (id, value, edges, hasMessages) and just give messages to
>>> vertex when calling compute.
>>>>>
>>>>> I'll fix the part when partitions are sent around before superstep,
>>>
>>> since that's the only part now which requires that all the messages for
>>> single partition fit in memory.
>>>>>
>>>>>
>>>>> This addresses bug GIRAPH-45.
>>>>>      https://issues.apache.org/jira/browse/GIRAPH-45
>>>>>
>>>>>
>>>>> Diffs
>>>>> -----
>>>>>
>>>>>
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendMessagesRequest.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java1363291
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.javaPRE-CREATION
>>>
>>> http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java1363291
>>>>>
>>>>> Diff: https://reviews.apache.org/r/6013/diff/
>>>>>
>>>>>
>>>>> Testing
>>>>> -------
>>>>>
>>>>> Run mvn verify and tests in pseudo-distributed mode, all apart from
>>>
>>> this one https://issues.apache.org/jira/browse/GIRAPH-259 pass.
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Maja Kabiljo
>>>>>
>>>>>
>>>
>>>
>>> --
>>>     Claudio Martella
>>>     claudio.martella@gmail.com
>>>
>



-- 
   Claudio Martella
   claudio.martella@gmail.com

Mime
View raw message