giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claudio Martella <claudio.marte...@gmail.com>
Subject Re: Multithreaded mapper in Giraph
Date Tue, 26 Jun 2012 20:29:20 GMT
Mapreduce is a bit different. You usually have less mappers than cores
as you don't want the trasktracker and the datanode to starve. I think
that for Giraph you'd already have multiple mappers per machine, so
multiple workers per machine. This would already give you the
parallelism you want. As you don't have so much overhead on the
tasktracker and no overhead at all on the datanode during the
computation (minus checkpointing), I guess you could push it a bit
further than mapreduce. Still, with netty, you have many threads
hanging around exchanging information, so you don't want them to
starve either.

Does it make sense to you?


On Tue, Jun 26, 2012 at 10:03 PM, Jan van der Lugt <janlugt@gmail.com> wrote:
> Multiple mappers per machine is of course still possible, but the overhead
> is very big if you have to run a mapper for every hardware thread. What's
> the best practice for setting up plain hadoop on machines with lots of
> (let's say more than 16) hardware threads? One mapper per hardware thread?
> I can change the mapper in a way that the default is still singlethreaded
> (simply be setting the default for the number of threads for the executor
> to 1), so we don't break the current behavior. Simply by setting a property
> in the job configuration you would be able to make your job
> multithreaded. Having singlethreaded mappers makes sense if you share your
> cluster with other types of jobs. Our hadoop cluster, however, will be used
> just for graph analysis, so it makes sense to exploit parallelism within
> the mapper.
>
> On Tue, Jun 26, 2012 at 10:37 AM, Jakob Homan <jghoman@gmail.com> wrote:
>
>> Right now it's because we're running on Hadoop Map-Reduce, which
>> strongly discourages multithreading within the mapper and expects one
>> to gain parallelism by running multiple mappers per machine.  In this
>> context, the overhead makes sense.  But you're right, as we move away
>> from HMR, it will make sense for us to control the multithreading
>> ourselves.
>>
>> On Tue, Jun 26, 2012 at 10:26 AM, Jan van der Lugt <janlugt@gmail.com>
>> wrote:
>> > Hi all,
>> >
>> > While I was browsing through the Giraph code, I wondered why the Giraph
>> > GraphMapper is not implemented in a multithreaded fashion. The whole
>> model
>> > of Pregel ensures the execution is embarrassingly parallel, so the
>> > implementation should be easy. If the marked piece of code below is
>> > extracted into a FutureTask and executed by a threadpoolexecutor (with a
>> > user-specifiable number of threads or automatic based on the machine),
>> the
>> > code would make use of multithreaded machines much more efficiently. If I
>> > would want to fully utilize a dual-octocore machine with hyperthreading
>> in
>> > the current model, I would need to have 20-30 mappers, which implies
>> > running 20-30 JVMs, 20-30 Netty clients, etc. I propose to change the
>> code
>> > below to make it multithreaded, I believe this can make Giraph much
>> faster.
>> > Just wanted to know your opinions on this and reasons why the mapper is
>> > currently not multithreaded before I start working on it. I'll have some
>> > performance figures later on so we can decide whether it is worthwhile to
>> > merge the code in the mainline branch.
>> >
>> > for (Partition<I, V, E, M> partition
>> > : serviceWorker.getPartitionMap().values()) {
>> >  PartitionStats partitionStats = new
>> > PartitionStats(partition.getPartitionId(), 0, 0, 0);
>> >  for (BasicVertex<I, V, E, M> basicVertex : partition.getVertices())
{
>> >    /* FROM HERE */
>> >    basicVertex.setGraphState(graphState);
>> >    if (basicVertex.isHalted()
>> > & !Iterables.isEmpty(basicVertex.getMessages())) {
>> >      basicVertex.halt = false;
>> >    }
>> >    if (!basicVertex.isHalted()) {
>> >      Iterator<M> vertexMsgIt = basicVertex.getMessages().iterator();
>> >      context.progress();
>> >      basicVertex.compute(vertexMsgIt);
>> >      basicVertex.releaseResources();
>> >    }
>> >    if (basicVertex.isHalted()) {
>> >      partitionStats.incrFinishedVertexCount();
>> >    }
>> >    partitionStats.incrVertexCount();
>> >    partitionStats.addEdgeCount(basicVertex.getNumOutEdges());
>> >    /* TO HERE */
>> >  }
>> >  partitionStatsList.add(partitionStats);
>> > }
>> >
>> > - Jan
>>



-- 
   Claudio Martella
   claudio.martella@gmail.com

Mime
View raw message