Return-Path: Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: (qmail 10677 invoked from network); 27 Jul 2010 11:35:19 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 27 Jul 2010 11:35:19 -0000 Received: (qmail 28667 invoked by uid 500); 27 Jul 2010 11:35:19 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 28462 invoked by uid 500); 27 Jul 2010 11:35:17 -0000 Mailing-List: contact mapreduce-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-user@hadoop.apache.org Delivered-To: mailing list mapreduce-user@hadoop.apache.org Received: (qmail 28452 invoked by uid 99); 27 Jul 2010 11:35:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2010 11:35:16 +0000 X-ASF-Spam-Status: No, hits=3.3 required=10.0 tests=HTML_MESSAGE,NO_RDNS_DOTCOM_HELO,SPF_NEUTRAL X-Spam-Check-By: apache.org Received-SPF: neutral (nike.apache.org: local policy) Received: from [216.145.54.172] (HELO mrout2.yahoo.com) (216.145.54.172) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2010 11:35:07 +0000 Received: from EGL-EX07CAS01.ds.corp.yahoo.com (egl-ex07cas01.eglbp.corp.yahoo.com [203.83.248.208]) by mrout2.yahoo.com (8.13.8/8.13.8/y.out) with ESMTP id o6RBYT1K029253 for ; Tue, 27 Jul 2010 04:34:30 -0700 (PDT) DomainKey-Signature: a=rsa-sha1; s=serpent; d=yahoo-inc.com; c=nofws; q=dns; h=received:from:to:date:subject:thread-topic:thread-index: message-id:in-reply-to:accept-language:content-language: x-ms-has-attach:x-ms-tnef-correlator:acceptlanguage:content-type:mime-version; b=JdvK9IMmCKgWWxuCclC261Q2C3lmlDh4fKZi6sE/fAp/+Fm7Ax9fUfBoUUA04cxx Received: from EGL-EX07VS01.ds.corp.yahoo.com ([203.83.248.205]) by EGL-EX07CAS01.ds.corp.yahoo.com ([203.83.248.215]) with mapi; Tue, 27 Jul 2010 17:04:29 +0530 From: Amogh Vasekar To: "mapreduce-user@hadoop.apache.org" Date: Tue, 27 Jul 2010 17:04:27 +0530 Subject: Re: Pipelining Mappers and Reducers Thread-Topic: Pipelining Mappers and Reducers Thread-Index: Acste/VkvAn5niiAQ/qZDfJ9L9eI6QAA7YBL Message-ID: In-Reply-To: Accept-Language: en-US Content-Language: en X-MS-Has-Attach: X-MS-TNEF-Correlator: acceptlanguage: en-US Content-Type: multipart/alternative; boundary="_000_C874C21BF20Famoghyahooinccom_" MIME-Version: 1.0 X-Virus-Checked: Checked by ClamAV on apache.org --_000_C874C21BF20Famoghyahooinccom_ Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Hi, >>What would really be great for me is if I could have the Reducer start pr= ocessing the map outputs as they are ready, and not after all Mappers finis= h Check the property mapred.reduce.slowstart.completed.maps >>I've read about chaining mappers, but to the best of my understanding the= second line of Mappers will only start after the first ones finished. Am I= correct? Not exactly, all the transformations will be in one go till the reduce barr= ier reaches >>Someone also hinted to me that I could write a Combiner that Hadoop might= invoke on the Reducer's side when Mappers finish, Combiners can be run on both map side and reduce side as soon as the buffer= is full ( many configuration properties control this ), and work well when= your reduce operations are not something like say, average. HTH, Amogh On 7/27/10 4:36 PM, "Shai Erera" wrote: Hi I have a scenario for which I'd like to write a MR job in which Mappers do = some work and eventually the output of all mappers need to be combined by a= single Reducer. Each Mapper outputs that is distinct from all = other Mappers, meaning the Reducer.reduce() method always receives a single= element in the values argument of a specific key. Really - the Mappers are= independent of each others in their output. What would really be great for me is if I could have the Reducer start proc= essing the map outputs as they are ready, and not after all Mappers finish.= For example, I'm processing a very large data set and the MR framework spa= wns hundreds of Mappers for the task. The output of all Mappers though is r= equired to be processed by 1 Reducer. It so happens to be that the Reducer = job is very heavy, compared to the Mappers, and while all Mappers finish in= about 7 minutes (total wall clock time), the Reducer takes ~30 minutes. In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that if = I could streamline the outputs of the Mappers to the Reducer, I could gain = some cycles back - I can easily limit the number of Mappers to say 95 and h= ave the Reducer constantly doing some job. I've read about chaining mappers, but to the best of my understanding the s= econd line of Mappers will only start after the first ones finished. Am I c= orrect? Someone also hinted to me that I could write a Combiner that Hadoop might i= nvoke on the Reducer's side when Mappers finish, if say the data of the Map= pers is very large and cannot be kept in RAM. I haven't tried it yet, so if= anyone can confirm this will indeed work, I'm willing to give it a try. Th= e output of the Mappers is very large, and therefore they already write it = directly to disk. So I'd like to avoid doing this serialization twice (once= when the Mapper works, and the second time when Hadoop will *flush* the Re= ducer's buffer - or whatever the right terminology is). I apologize if this has been raised before - if it has, could you please po= int me at the relevant discussion/issue? Shai --_000_C874C21BF20Famoghyahooinccom_ Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Re: Pipelining Mappers and Reducers Hi,
>>What would really be great for me is if I could have the Reducer st= art processing the map outputs as they are ready, and not after all Mappers= finish
Check the property mapred.reduce.slowstart.completed.maps

>>I've read about chaining mappers, but to the best of my understandi= ng the second line of Mappers will only start after the first ones finished= . Am I correct?
Not exactly, all the transformations will be in one go till the reduce barr= ier reaches

>>Someone also hinted to me that I could write a Combiner that Hadoop= might invoke on the Reducer's side when Mappers finish,
Combiners can be run on both map side and reduce side as soon as the buffer= is full ( many configuration properties control this ), and work well when= your reduce operations are not something like say, average.  

HTH,
Amogh

On 7/27/10 4:36 PM, "Shai Erera" <serera@gmail.com> wrote:

Hi

I have a scenario for which I'd like to write a MR job in which Mappers do = some work and eventually the output of all mappers need to be combined by a= single Reducer. Each Mapper outputs <key,value> that is distinct fro= m all other Mappers, meaning the Reducer.reduce() method always receives a = single element in the values argument of a specific key. Really - the Mappe= rs are independent of each others in their output.

What would really be great for me is if I could have the Reducer start proc= essing the map outputs as they are ready, and not after all Mappers finish.= For example, I'm processing a very large data set and the MR framework spa= wns hundreds of Mappers for the task. The output of all Mappers though is r= equired to be processed by 1 Reducer. It so happens to be that the Reducer = job is very heavy, compared to the Mappers, and while all Mappers finish in= about 7 minutes (total wall clock time), the Reducer takes ~30 minutes.
In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that if = I could streamline the outputs of the Mappers to the Reducer, I could gain = some cycles back - I can easily limit the number of Mappers to say 95 and h= ave the Reducer constantly doing some job.

I've read about chaining mappers, but to the best of my understanding the s= econd line of Mappers will only start after the first ones finished. Am I c= orrect?

Someone also hinted to me that I could write a Combiner that Hadoop might i= nvoke on the Reducer's side when Mappers finish, if say the data of the Map= pers is very large and cannot be kept in RAM. I haven't tried it yet, so if= anyone can confirm this will indeed work, I'm willing to give it a try. Th= e output of the Mappers is very large, and therefore they already write it = directly to disk. So I'd like to avoid doing this serialization twice (once= when the Mapper works, and the second time when Hadoop will *flush* the Re= ducer's buffer - or whatever the right terminology is).

I apologize if this has been raised before - if it has, could you please po= int me at the relevant discussion/issue?

Shai

--_000_C874C21BF20Famoghyahooinccom_--