Return-Path: Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: (qmail 89166 invoked from network); 27 Jul 2010 17:48:11 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 27 Jul 2010 17:48:11 -0000 Received: (qmail 9255 invoked by uid 500); 27 Jul 2010 17:48:11 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 9203 invoked by uid 500); 27 Jul 2010 17:48:10 -0000 Mailing-List: contact mapreduce-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-user@hadoop.apache.org Delivered-To: mailing list mapreduce-user@hadoop.apache.org Received: (qmail 9195 invoked by uid 99); 27 Jul 2010 17:48:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2010 17:48:10 +0000 X-ASF-Spam-Status: No, hits=3.3 required=10.0 tests=HTML_MESSAGE,NO_RDNS_DOTCOM_HELO,SPF_NEUTRAL X-Spam-Check-By: apache.org Received-SPF: neutral (athena.apache.org: local policy) Received: from [216.145.54.172] (HELO mrout2.yahoo.com) (216.145.54.172) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2010 17:48:03 +0000 Received: from sp1-ex07cas03.ds.corp.yahoo.com (sp1-ex07cas03.ds.corp.yahoo.com [216.252.116.151]) by mrout2.yahoo.com (8.13.8/8.13.8/y.out) with ESMTP id o6RHl6ck042003 for ; Tue, 27 Jul 2010 10:47:06 -0700 (PDT) DomainKey-Signature: a=rsa-sha1; s=serpent; d=yahoo-inc.com; c=nofws; q=dns; h=received:from:to:date:subject:thread-topic:thread-index: message-id:in-reply-to:accept-language:content-language: x-ms-has-attach:x-ms-tnef-correlator:acceptlanguage:content-type:mime-version; b=PQA0S73CLy+rRFHPZV7E4eYfXn3fxdS5CysNEGbsSWkwJVYFbMSw8cqIrKEoTpfq Received: from SP1-EX07VS01.ds.corp.yahoo.com ([216.252.116.139]) by sp1-ex07cas03.ds.corp.yahoo.com ([216.252.116.151]) with mapi; Tue, 27 Jul 2010 10:47:03 -0700 From: Gregory Lawrence To: "mapreduce-user@hadoop.apache.org" Date: Tue, 27 Jul 2010 10:47:02 -0700 Subject: Re: Pipelining Mappers and Reducers Thread-Topic: Pipelining Mappers and Reducers Thread-Index: Acste+6lkpvZ7iUOQHmzLAn9xWOSzgAN8lge Message-ID: In-Reply-To: Accept-Language: en-US Content-Language: en X-MS-Has-Attach: X-MS-TNEF-Correlator: acceptlanguage: en-US Content-Type: multipart/alternative; boundary="_000_C87469A6A0A5greglyahooinccom_" MIME-Version: 1.0 --_000_C87469A6A0A5greglyahooinccom_ Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Shai, It's hard to determine what the best solution would be without knowing more= about your problem. In general, combiner functions work well but they will= be of little value if each mapper output contains a unique key. This is be= cause combiner functions only "combine" multiple values associated with the= same key (e.g., counting the number of occurrences of a word). Another com= mon approach is to use two mapreduce jobs. The first job would use multiple= reducers to do some processing. Here, you can hopefully shrink the size of= the data by generating, for example, some sufficient statistics of what ul= timately needs to be generated. A second mapreduce job would take the inter= mediate output and produce the desired result. As for the reducer processing map outputs as they are ready question, I bel= ieve that the copy stage may start before all mappers are finished. However= , the sorting and application of your reduce function can not proceed until= each mapper is finished. Could you describe your problem in more detail? Regards, Greg Lawrence On 7/27/10 4:06 AM, "Shai Erera" wrote: Hi I have a scenario for which I'd like to write a MR job in which Mappers do = some work and eventually the output of all mappers need to be combined by a= single Reducer. Each Mapper outputs that is distinct from all = other Mappers, meaning the Reducer.reduce() method always receives a single= element in the values argument of a specific key. Really - the Mappers are= independent of each others in their output. What would really be great for me is if I could have the Reducer start proc= essing the map outputs as they are ready, and not after all Mappers finish.= For example, I'm processing a very large data set and the MR framework spa= wns hundreds of Mappers for the task. The output of all Mappers though is r= equired to be processed by 1 Reducer. It so happens to be that the Reducer = job is very heavy, compared to the Mappers, and while all Mappers finish in= about 7 minutes (total wall clock time), the Reducer takes ~30 minutes. In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that if = I could streamline the outputs of the Mappers to the Reducer, I could gain = some cycles back - I can easily limit the number of Mappers to say 95 and h= ave the Reducer constantly doing some job. I've read about chaining mappers, but to the best of my understanding the s= econd line of Mappers will only start after the first ones finished. Am I c= orrect? Someone also hinted to me that I could write a Combiner that Hadoop might i= nvoke on the Reducer's side when Mappers finish, if say the data of the Map= pers is very large and cannot be kept in RAM. I haven't tried it yet, so if= anyone can confirm this will indeed work, I'm willing to give it a try. Th= e output of the Mappers is very large, and therefore they already write it = directly to disk. So I'd like to avoid doing this serialization twice (once= when the Mapper works, and the second time when Hadoop will *flush* the Re= ducer's buffer - or whatever the right terminology is). I apologize if this has been raised before - if it has, could you please po= int me at the relevant discussion/issue? Shai --_000_C87469A6A0A5greglyahooinccom_ Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Re: Pipelining Mappers and Reducers Shai,

It’s hard to determine what the best solution would be without knowin= g more about your problem. In general, combiner functions work well but the= y will be of little value if each mapper output contains a unique key. This= is because combiner functions only “combine” multiple values a= ssociated with the same key (e.g., counting the number of occurrences of a = word). Another common approach is to use two mapreduce jobs. The first job = would use multiple reducers to do some processing. Here, you can hopefully = shrink the size of the data by generating, for example, some sufficient sta= tistics of what ultimately needs to be generated. A second mapreduce job wo= uld take the intermediate output and produce the desired result.

As for the reducer processing map outputs as they are ready question, I bel= ieve that the copy stage may start before all mappers are finished. However= , the sorting and application of your reduce function can not proceed until= each mapper is finished.

Could you describe your problem in more detail?

Regards,
Greg Lawrence

On 7/27/10 4:06 AM, "Shai Erera" <serera@gmail.com> wrote:

Hi

I have a scenario for which I'd like to write a MR job in which Mappers do = some work and eventually the output of all mappers need to be combined by a= single Reducer. Each Mapper outputs <key,value> that is distinct fro= m all other Mappers, meaning the Reducer.reduce() method always receives a = single element in the values argument of a specific key. Really - the Mappe= rs are independent of each others in their output.

What would really be great for me is if I could have the Reducer start proc= essing the map outputs as they are ready, and not after all Mappers finish.= For example, I'm processing a very large data set and the MR framework spa= wns hundreds of Mappers for the task. The output of all Mappers though is r= equired to be processed by 1 Reducer. It so happens to be that the Reducer = job is very heavy, compared to the Mappers, and while all Mappers finish in= about 7 minutes (total wall clock time), the Reducer takes ~30 minutes.
In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that if = I could streamline the outputs of the Mappers to the Reducer, I could gain = some cycles back - I can easily limit the number of Mappers to say 95 and h= ave the Reducer constantly doing some job.

I've read about chaining mappers, but to the best of my understanding the s= econd line of Mappers will only start after the first ones finished. Am I c= orrect?

Someone also hinted to me that I could write a Combiner that Hadoop might i= nvoke on the Reducer's side when Mappers finish, if say the data of the Map= pers is very large and cannot be kept in RAM. I haven't tried it yet, so if= anyone can confirm this will indeed work, I'm willing to give it a try. Th= e output of the Mappers is very large, and therefore they already write it = directly to disk. So I'd like to avoid doing this serialization twice (once= when the Mapper works, and the second time when Hadoop will *flush* the Re= ducer's buffer - or whatever the right terminology is).

I apologize if this has been raised before - if it has, could you please po= int me at the relevant discussion/issue?

Shai

--_000_C87469A6A0A5greglyahooinccom_--