giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eli Reisman <>
Subject Re: Review Request: Out-of-core messages
Date Wed, 01 Aug 2012 21:53:07 GMT
I have been working on locality that does not involve changes to how we
interact with map reduce, but would be very interested in taking the more
"active" Hadoop-style approach Avery mentioned if its practical, since its
is already available in Hadoop and we already use mappers for our workers.

I was hoping to change the vertex IO formats and/or job submission code and
was puzzled why this doesn't already happen. Why was this Hadoop feature
not taken advantage of originally? Is there more to this change than it
looks like? Is there something we get out of the existing approach, or
something about locality that is a problem for Giraph as it is now?

On Wed, Aug 1, 2012 at 11:43 AM, Maja Kabiljo <> wrote:

> I've been running some benchmarking of this solution, I put in Excel
> document in the attachment. There are some results of PageRankBenchmark
> and RandomMessagesBenchmark. Sheets 'Page Rank 3', 'Page Rank 4' and
> 'Messages 3' show the cases in which we run out of memory. Shortest Paths
> algorithm uses messages very little when compared to the amount of other
> data, so there I couldn't see any differences between solutions.
> Interesting cases are 'Page Rank 2' and 'Messages 2' where I guess we are
> very tight on memory so going out of core helps (I ran those a few times
> since, but keep getting the same results). We can also see that execution
> time is improved with just SimpleMessageStore, since in current
> implementation we copy messages around when we store them in vertex.
> I also tried running RandomMessagesBenchmark with really huge amount of
> messages, but it crashed because message store didn't process messages
> fast enough and worker got flooded with unprocessed requests. So in cases
> like that the only thing which could help us would be to decrease the
> speed of compute executions. But I think this is something that shouldn't
> happen in real applications - this benchmark doesn't use received messages
> at all, in a real application executions are going to be slower anyway if
> they have to process that much data. Anyway, it would be good to have a
> real problem which uses messages intensively and then we could see what's
> really going on.
> As a conclusion, to start with, maybe I can create a smaller patch from
> this which only adds SimpleMessageStore, since as we can see keeping
> messages outside of vertices helps. And then, once the RPC is removed, we
> will be able to finally remove putMessages/getMessages/getNumMessages
> functions from Vertex. For the out-of-core part, if we still offer the
> option not to use it as default, I see no harm of adding it also, and as
> you can see there are benefits in some cases.
> Another thing, I think I should explain what from GIRAPH-45 discussion am
> I actually using here, since I don't use bloomfilters and BTrees. The way
> it works is the following:
> - Inside the outer message store we have message stores for each of the
> partitions separately.
> - Partition message stores keep data in ordered map (ordered by vertex id).
> - In outer messages store we check if we should flush something (do we
> have more than allowed number of messages in memory). While we do, we
> flush the partition with largest number of messages in memory.
> - When partition messages store is flushed, all the data is written to a
> file in the order of vertex ids, file content is like:
> num_vertices
> vertex_1_id num_messages_1 message_1_1 message_1_2 ...
> vertex_2_id num_messages_2 message_2_1 message_2_2 ...
> ...
> - In the end each partition will have some messages in memory, and N
> files, where N is the number of times it was flushed.
> - When it's time to do the computation, within a single partition we call
> compute methods in order of vertex ids.
> - We use buffered streams and read data from all partition files
> sequentially, since we'll need data in the same order it's written in each
> of the files. This way we limit number of random file accesses.
> Maja
> On 7/24/12 1:45 AM, "Avery Ching" <> wrote:
> >We should integrate the partitioning of the graph into the input
> >superstep to get locality as well.  We can use MapReduce to try and
> >schedule the map tasks (workers) closest to its data and then make the
> >workers smart enough to only try to load their partitions.
> >
> >On 7/22/12 4:30 PM, Claudio Martella wrote:
> >> I see your reasoning. In general I'm being open to use MR when
> >> necessary (e.g. i used to propose it instead of the automatic vertex
> >> creation), here it could get tricky. I can see additional HDFS usage
> >> as down (you have to be able to store 2x the graph). However, once the
> >> graph is pre-filtered, this additional stage would not be necessary
> >> again for the successive jobs (only when a different number of workers
> >> is used). Though, it does add a not so small passage to the process.
> >>
> >> On Sun, Jul 22, 2012 at 10:49 PM, Alessandro Presta <>
> >>wrote:
> >>> Exactly. On paper, the amount of data around should be the same as
> >>>during
> >>> the computation, but in practice we do use a lot more memory.
> >>> You can play with the settings and just push the problem a little
> >>>farther
> >>> away, by caching less and flushing requests more frequently, so then
> >>>the
> >>> bottleneck is on the servers.
> >>> We're basically sending (k-1)/k of the graph through the network,
> >>>where k
> >>> is the number of workers.
> >>>
> >>> What I'm thinking is that in INPUT_SUPERSTEP we're doing what
> >>>MapReduce is
> >>> really good at (sorting and aggregating) in a probably inefficient (or
> >>>at
> >>> least non-scalable) way.
> >>> We could try implementing it with a MapReduce job instead, where the
> >>> mappers take input splits and emit (partition_id, vertex) (they would
> >>>have
> >>> access to the partitioner) and reducers just output the built
> >>>partitions
> >>> to HDFS.
> >>> The computation stage would then be the usual Giraph job, where each
> >>> worker knows where to get its partitions from HDFS.
> >>> I can try making this change and see how it goes. It would just be one
> >>>MR
> >>> job, so we're not selling our souls to iterative MR.
> >>>
> >>> I can also see many cases where one might not want to shuffle vertices
> >>> around at all: each worker reads a roughly equal part of the input
> >>>(forget
> >>> about bigger vertices for now) and simply communicates its own vertex
> >>>ids
> >>> to the master. Partition "a posteriori" instead of "a priori".
> >>>
> >>> What do you think?
> >>>
> >>> On 7/20/12 9:42 PM, "Eli Reisman" <> wrote:
> >>>
> >>>> What we are seeing in the metrics is the three-way load of
> >>>>
> >>>> 1. reading InputSplits from HDFS (mostly over the wire as there is no
> >>>> locality right now)
> >>>> 2. creating temporary collections of vertices, sending them on netty
> >>>> 3. simultaneously receiving collections of vertices on netty from
> >>>>remote
> >>>> nodes that will be place in the local workers' partitions for
> >>>>processing
> >>>> stages
> >>
> >>

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message