Return-Path: X-Original-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0EC7D7110 for ; Tue, 27 Dec 2011 11:13:30 +0000 (UTC) Received: (qmail 50012 invoked by uid 500); 27 Dec 2011 11:13:28 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 49979 invoked by uid 500); 27 Dec 2011 11:13:28 -0000 Mailing-List: contact mapreduce-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-user@hadoop.apache.org Delivered-To: mailing list mapreduce-user@hadoop.apache.org Received: (qmail 49971 invoked by uid 99); 27 Dec 2011 11:13:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Dec 2011 11:13:28 +0000 X-ASF-Spam-Status: No, hits=1.6 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of burtonator2011@gmail.com designates 74.125.82.176 as permitted sender) Received: from [74.125.82.176] (HELO mail-we0-f176.google.com) (74.125.82.176) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Dec 2011 11:13:24 +0000 Received: by werm10 with SMTP id m10so7878007wer.35 for ; Tue, 27 Dec 2011 03:13:03 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=mime-version:sender:in-reply-to:references:from:date :x-google-sender-auth:message-id:subject:to:content-type; bh=9iwLy+/YcAAXxNVULYRJQSHS/nwsrMvoXj95WMMMtWk=; b=bqOXkL+8NzpOWrOGIDEkuHCNW77xzAvS/Ly1oSyMwvHmfY/qf7ms0Gwa55za0NM7wa Kk8xUgQgxYF+Zas1ecP/QFRcZAq4hoZqnT5/v4Xbw7XEoK+flIk2v8bQ6SsvSB+e/OD+ eD0xgHnTcfBVmUMkObmjere2W2Y9m25GhQc8M= Received: by 10.216.135.162 with SMTP id u34mr15368843wei.1.1324984382842; Tue, 27 Dec 2011 03:13:02 -0800 (PST) MIME-Version: 1.0 Sender: burtonator2011@gmail.com Received: by 10.216.52.139 with HTTP; Tue, 27 Dec 2011 03:12:41 -0800 (PST) In-Reply-To: <90082C93-F81A-44CB-9A5E-661415DA80B0@hortonworks.com> References: <90082C93-F81A-44CB-9A5E-661415DA80B0@hortonworks.com> From: Kevin Burton Date: Tue, 27 Dec 2011 03:12:41 -0800 X-Google-Sender-Auth: r1N4wwMnwK16J9Ey7xIkWcrt5Eo Message-ID: Subject: Re: A new map reduce framework for iterative/pipelined jobs. To: mapreduce-user@hadoop.apache.org Content-Type: multipart/alternative; boundary=0016e6de007878eba304b510f761 --0016e6de007878eba304b510f761 Content-Type: text/plain; charset=ISO-8859-1 > > Thanks for sharing. I'd love to play with it, do you have a > README/user-guide for systat? > > Not a ton but I could write some up... Basically I modeled it after vmstat/iostat on Linux. http://sebastien.godard.pagesperso-orange.fr/documentation.html The theory is that most platforms have similar facilities so drivers could be written per platform and then at runtime the platform is determined or an 'unsupported' null object is returned which doesn't do anything. The output is IO , CPU and network throughput per second for the entire run... so it would basically be a 5 minute average per second run if the job took 5 minutes to execute (see below for an example) Couple of questions: > # How does peregrine deal with the case that you might not have available > resources to start reduces while the maps are running? > If maps are not completed we can't start a reduce phase until they complete. Right now I don't have speculative execution turned on in the builds but the support is there. One *could* do the initial segment sort of the reduce but doing the full merge isn't really helpful as even if ONE key changes you have to re-merge all the IO. The issue of running a ReduceMap where the output of one reduce is the input of another map does require some coordination. Right now my plan is to split the work load in half.. so that the buffers are just 50% of their original values since they both have to be in play. I'm not using the Java heap's memory but instead mmap() so I can get away with some more fun tasks like shrinking various buffers and so forth. > Is the map-output buffered to disk before the reduces start? > No... Right now I don't have combiners implemented ... We do directly shuffling where IO is written directly to the reducer nodes instead of writing to disk first. I believe strongly *but need more evidence* that in practical loads that direct shuffling will be far superior to the indirect shuffling mechanism that hadoop uses. There ARE some situations I think where indirect shuffling could solve some pathological situations but that in practice these won't arise (and certainly not in the pagerank impl and with our data). We're going to buffer the IO so that about 250MB or so is put through a combiner before sent through snappy for compression and then the result is directly shuffled. > # How does peregrine deal with failure of in-flight reduces (potentially > after they have recieved X% of maps' outputs). > The reduces are our major checkpoint mode right now.... There are two solutions I'm thinking about... (and perhaps both will be implemented in production and you can choose which strategy to use). 1. One replica of a partition starts a reduce, none of the blocks are replicated, if it fails the whole reduce has to start again. 2. All blocks are replicated, but if a reduce fails it can just resume on another node. ... I think #1 though in practice will be the best strategy. A physical machine hosts about 10 partitions so even if a crash DOES happen and you have to resume a reduce you're only doing 1/10th of the data... And since recovery is now happening the other 9 partitions are split across 9 different hosts so the reduces there can be done in parallel. > # How much does peregrine depend on PFS? One idea worth exploring might be > to run peregrine within YARN (MR2) as an application. Would you be > interested in trying that? > It depends heavily upon PFS... the block allocation is all done via the PFS layer and these need to be deterministic or the partitioning functionality will not work. Also, all the IO is done through async IO ... because at 10k machines you can't do threaded IO as it would require too much memory. I was thinking the other day (and talking with my staff) that right now if you view the distributed systems space there is a LOT of activity in Hadoop because it's one of the widest deployed platforms out there.. But if you look at the *history* of computer science, we have NEVER settled on a single OS, single programming language, single editor, etc. There is always a set of choices out there because some tools are better suited to the task than others. MySQL vs Postgres, Cassandra vs Hbase, etc.. and even in a lot of cases it's not 'versus' as some tools are better for the job than others. I think this might be the Peregrine/Hadoop situation. Peregrine would be VERY bad for some tasks right now for example... If you have log files to process and just want to grok them , query them, etc... then a Hadoop / Pig / Hive setup would be WAY easier to run and far more reliable. My thinking is that Peregrine should just focus on the area where I think Hadoop could use some improvement. Specifically iterative jobs and more efficient pipelined IO... I also think that there are a lot of ergonomic areas that collaboration should/could happen across a number of runtimes... for example the sysstat package. For our part we're going to use the Hadoop CRC32 encoder for storing blocks into PFS... Processor %util --------- ----- cpu 2.00 cpu0 6.00 cpu1 2.00 cpu2 2.00 cpu3 1.00 cpu4 3.00 cpu5 1.00 cpu6 1.00 cpu7 1.00 cpu8 5.00 cpu9 2.00 cpu10 1.00 cpu11 1.00 cpu12 4.00 cpu13 1.00 cpu14 1.00 cpu15 1.00 Disk reads writes bytes read bytes written Avg req size %util ---- ----- ------ ---------- ------------- ------------ ----- sda 82,013 40,933 15,377,601 18,155,568 272.75 100.00 Interface bits rx bits tx --------- ------- ------- lo 125 125 eth0 122,918 233,160 eth1 26 10 sit0 0 0 -- -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* Skype: *burtonator* Skype-in: *(415) 871-0687* --0016e6de007878eba304b510f761 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable

<= /div>
Thanks for sharing. I'd love to play with it, do you have a README/user= -guide for systat?


Not a ton but I could write some up...

Basically I = modeled it after vmstat/iostat on Linux.

http://sebastien.godard.pagesperso-orange.fr/documentation.html
The theory is that most platforms have similar facilities so drivers c= ould be written per platform and then at runtime the platform is determined= or an 'unsupported' null object is returned which doesn't do a= nything.

The output is IO , CPU and network throughput per second for the entire= run... so it would basically be a 5 minute average per second run if the j= ob took 5 minutes to execute (see below for an example)


Couple of questions:=A0
# How does peregrine deal= with the case that you might not have available resources to start reduces= while the maps are running?

If maps are not completed we can't start a = reduce phase until they complete.

Right now I don't have specula= tive execution turned on in the builds but the support is there.

One *could* do the initial segment sort of the reduce but doing the full me= rge isn't really helpful as even if ONE key changes you have to re-merg= e all the IO.

The issue of running a ReduceMap where the output of one reduce is the = input of another map does require some coordination.

Right now my pl= an is to split the work load in half.. so that the buffers are just 50% of = their original values since they both have to be in play.

I'm not using the Java heap's memory but instead mmap() so I ca= n get away with some more fun tasks like shrinking various buffers and so f= orth.
=A0
Is the map-output buffered to dis= k before the reduces start?

No... Right no= w I don't have combiners implemented ... We do directly shuffling where= IO is written directly to the reducer nodes instead of writing to disk fir= st.=A0

I believe strongly *but need more evidence* that in practical loads tha= t direct shuffling will be far superior to the indirect shuffling mechanism= that hadoop uses.

There ARE some situations I think where indirect = shuffling could solve some pathological situations but that in practice the= se won't arise (and certainly not in the pagerank impl and with our dat= a).

We're going to buffer the IO so that about 250MB or so is put throu= gh a combiner before sent through snappy for compression and then the resul= t is directly shuffled.
=A0
# How does peregrine deal with fai= lure of in-flight reduces (potentially after they have recieved X% of maps&= #39; outputs).

The reduces are our major c= heckpoint mode right now....

There are two solutions I'm thinking about... (and perhaps both wil= l be implemented in production and you can choose which strategy to use).
1.=A0 One replica of a partition starts a reduce, none of the blocks = are replicated, if it fails the whole reduce has to start again.

2.=A0 All blocks are replicated,=A0 but if a reduce fails it can just r= esume on another node.

... I think #1 though in practice will be the= best strategy.=A0 A physical machine hosts about 10 partitions so even if = a crash DOES happen and you have to resume a reduce you're only doing 1= /10th of the data...

And since recovery is now happening the other 9 partitions are split ac= ross 9 different hosts so the reduces there can be done in parallel.
=A0=
# How much does peregrine depend o= n PFS? One idea worth exploring might be to run peregrine within YARN (MR2)= as an application. Would you be interested in trying that?


It depends heavily upon PFS...=A0 the block allocation is all = done via the PFS layer and these need to be deterministic or the partitioni= ng functionality will not work.

Also, all the IO is done through asy= nc IO ... because at 10k machines you can't do threaded IO as it would = require too much memory.

I was thinking the other day (and talking with my staff) that right now= if you view the distributed systems space there is a LOT of activity in Ha= doop because it's one of the widest deployed platforms out there..

But if you look at the *history* of computer science, we have NEVER set= tled on a single OS, single programming language, single editor, etc.=A0 Th= ere is always a set of choices out there because some tools are better suit= ed to the task than others.

MySQL vs Postgres, Cassandra vs Hbase, etc.. and even in a lot of cases= it's not 'versus' as some tools are better for the job than ot= hers.

I think this might be the Peregrine/Hadoop situation.

Peregrine would be VERY bad for some tasks right now for example... If you = have log files to process and just want to grok them , query them, etc... t= hen a Hadoop / Pig / Hive setup would be WAY easier to run and far more rel= iable.

My thinking is that Peregrine should just focus on the area where I thi= nk Hadoop could use some improvement.=A0 Specifically iterative jobs and mo= re efficient pipelined IO...

I also think that there are a lot of er= gonomic areas that collaboration should/could happen across a number of run= times... for example the sysstat package.

For our part we're going to use the Hadoop CRC32 encoder for storin= g blocks into PFS...



=A0
=A0Processor=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 %util=
=A0---------=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 -----
=A0=A0=A0=A0=A0=A0 cp= u=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 2.00
=A0=A0=A0=A0=A0 cpu0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 6.00
=A0=A0=A0= =A0=A0 cpu1=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 2.00
=A0=A0=A0=A0=A0 cpu2=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 2.00
=A0=A0=A0= =A0=A0 cpu3=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00
=A0=A0=A0=A0=A0 cpu4=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 3.00
=A0=A0=A0= =A0=A0 cpu5=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00
=A0=A0=A0=A0=A0 cpu6=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00
=A0=A0=A0= =A0=A0 cpu7=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00
=A0=A0=A0=A0=A0 cpu8=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 5.00
=A0=A0=A0= =A0=A0 cpu9=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 2.00
=A0=A0=A0=A0 cpu10=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00
=A0=A0=A0=A0 = cpu11=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00
=A0=A0=A0=A0 cpu12=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 4.00
=A0=A0=A0=A0 = cpu13=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00
=A0=A0=A0=A0 cpu14=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00
=A0=A0=A0=A0 = cpu15=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 1.00

=A0=A0=A0=A0=A0 Disk=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 r= eads=A0=A0=A0=A0=A0=A0=A0=A0=A0 writes=A0=A0=A0=A0=A0 bytes read=A0=A0 byte= s written=A0=A0=A0 Avg req size=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 %util<= br style=3D"font-family:courier new,monospace"> =A0=A0=A0=A0=A0 ----=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 -----=A0=A0=A0=A0=A0=A0=A0=A0=A0 ------=A0=A0= =A0=A0=A0 ----------=A0=A0 -------------=A0=A0=A0 ------------=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 -----
=A0=A0=A0=A0=A0=A0 sd= a=A0=A0=A0=A0=A0=A0=A0=A0=A0 82,013=A0=A0=A0=A0=A0=A0=A0=A0=A0 40,933=A0=A0= =A0=A0=A0 15,377,601=A0=A0=A0=A0=A0 18,155,568=A0=A0=A0=A0=A0=A0=A0=A0=A0 2= 72.75=A0=A0=A0=A0=A0=A0=A0=A0=A0 100.00

=A0Interface=A0=A0=A0=A0=A0=A0=A0=A0 bits rx=A0=A0= =A0=A0=A0=A0=A0=A0 bits tx
=A0---------=A0=A0= =A0=A0=A0=A0=A0=A0 -------=A0=A0=A0=A0=A0=A0=A0=A0 -------
=A0=A0=A0=A0=A0=A0=A0 lo= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 125=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 125
=A0=A0=A0=A0=A0 eth0=A0=A0=A0=A0=A0= =A0=A0=A0 122,918=A0=A0=A0=A0=A0=A0=A0=A0 233,160
=A0=A0=A0=A0=A0 eth1=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 26=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 10
=A0=A0=A0=A0=A0 sit0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0 0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 0



--
--

Founder/CEO=A0Spi= nn3r.com

Location:=A0San Francisco, = CA
Skype:=A0burtonator

Skype-in:=A0(415) 871-068= 7


--0016e6de007878eba304b510f761--