Return-Path: Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: (qmail 2323 invoked from network); 27 Jul 2010 11:07:25 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 27 Jul 2010 11:07:25 -0000 Received: (qmail 7528 invoked by uid 500); 27 Jul 2010 11:07:24 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 7484 invoked by uid 500); 27 Jul 2010 11:07:21 -0000 Mailing-List: contact mapreduce-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-user@hadoop.apache.org Delivered-To: mailing list mapreduce-user@hadoop.apache.org Received: (qmail 7476 invoked by uid 99); 27 Jul 2010 11:07:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2010 11:07:21 +0000 X-ASF-Spam-Status: No, hits=2.2 required=10.0 tests=FREEMAIL_FROM,HTML_MESSAGE,SPF_PASS,T_TO_NO_BRKTS_FREEMAIL X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of serera@gmail.com designates 209.85.215.48 as permitted sender) Received: from [209.85.215.48] (HELO mail-ew0-f48.google.com) (209.85.215.48) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2010 11:07:14 +0000 Received: by ewy10 with SMTP id 10so1728921ewy.35 for ; Tue, 27 Jul 2010 04:06:53 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:received:received:date:message-id :subject:from:to:content-type; bh=kqgMQLhuS6tg7u8dEASTAr8vYcGkhZairRr+HnGpiHw=; b=c4xdWgsTxrBVyGY70MljCGA26fZRvBm1WBcnrusOY9OVNqtYy3jwhCmzJo08oVNFXF +zGZyCoSU+qMEOlZqWG1dLkUig20L4RpJ8tmaQ+vrjEMlhLOuBESKul9Dj0AOA/P3t7r YsWjvbZ4XPWHw9GZqfG+jwVjvKlTPaP18kpsM= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:date:message-id:subject:from:to:content-type; b=kkaa52BG4c5DAxa/zhkkD3N04vF+C8+ZlRZNLvt4pX9Fo/prkyvEAtBWbKzeqvJtJH PdU2EImHPFTp9VqTEpVbzXvaJV/A38BEDvaSrYdeM7KN1wlEP4P0troy+el2z3th45Ig Wd9kV4pLohw9YAMWdJq9RrGR5W3z1J+N79bpQ= MIME-Version: 1.0 Received: by 10.213.77.75 with SMTP id f11mr4463402ebk.15.1280228813334; Tue, 27 Jul 2010 04:06:53 -0700 (PDT) Received: by 10.14.6.70 with HTTP; Tue, 27 Jul 2010 04:06:53 -0700 (PDT) Date: Tue, 27 Jul 2010 14:06:53 +0300 Message-ID: Subject: Pipelining Mappers and Reducers From: Shai Erera To: mapreduce-user@hadoop.apache.org Content-Type: multipart/alternative; boundary=00c09f7b25efa66c36048c5c7f26 --00c09f7b25efa66c36048c5c7f26 Content-Type: text/plain; charset=ISO-8859-1 Hi I have a scenario for which I'd like to write a MR job in which Mappers do some work and eventually the output of all mappers need to be combined by a single Reducer. Each Mapper outputs that is distinct from all other Mappers, meaning the Reducer.reduce() method always receives a single element in the values argument of a specific key. Really - the Mappers are independent of each others in their output. What would really be great for me is if I could have the Reducer start processing the map outputs as they are ready, and not after all Mappers finish. For example, I'm processing a very large data set and the MR framework spawns hundreds of Mappers for the task. The output of all Mappers though is required to be processed by 1 Reducer. It so happens to be that the Reducer job is very heavy, compared to the Mappers, and while all Mappers finish in about 7 minutes (total wall clock time), the Reducer takes ~30 minutes. In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that if I could streamline the outputs of the Mappers to the Reducer, I could gain some cycles back - I can easily limit the number of Mappers to say 95 and have the Reducer constantly doing some job. I've read about chaining mappers, but to the best of my understanding the second line of Mappers will only start after the first ones finished. Am I correct? Someone also hinted to me that I could write a Combiner that Hadoop might invoke on the Reducer's side when Mappers finish, if say the data of the Mappers is very large and cannot be kept in RAM. I haven't tried it yet, so if anyone can confirm this will indeed work, I'm willing to give it a try. The output of the Mappers is very large, and therefore they already write it directly to disk. So I'd like to avoid doing this serialization twice (once when the Mapper works, and the second time when Hadoop will *flush* the Reducer's buffer - or whatever the right terminology is). I apologize if this has been raised before - if it has, could you please point me at the relevant discussion/issue? Shai --00c09f7b25efa66c36048c5c7f26 Content-Type: text/html; charset=ISO-8859-1
Hi

I have a scenario for which I'd like to write a MR job in which Mappers do some work and eventually the output of all mappers need to be combined by a single Reducer. Each Mapper outputs <key,value> that is distinct from all other Mappers, meaning the Reducer.reduce() method always receives a single element in the values argument of a specific key. Really - the Mappers are independent of each others in their output.

What would really be great for me is if I could have the Reducer start processing the map outputs as they are ready, and not after all Mappers finish. For example, I'm processing a very large data set and the MR framework spawns hundreds of Mappers for the task. The output of all Mappers though is required to be processed by 1 Reducer. It so happens to be that the Reducer job is very heavy, compared to the Mappers, and while all Mappers finish in about 7 minutes (total wall clock time), the Reducer takes ~30 minutes.

In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that if I could streamline the outputs of the Mappers to the Reducer, I could gain some cycles back - I can easily limit the number of Mappers to say 95 and have the Reducer constantly doing some job.

I've read about chaining mappers, but to the best of my understanding the second line of Mappers will only start after the first ones finished. Am I correct?

Someone also hinted to me that I could write a Combiner that Hadoop might invoke on the Reducer's side when Mappers finish, if say the data of the Mappers is very large and cannot be kept in RAM. I haven't tried it yet, so if anyone can confirm this will indeed work, I'm willing to give it a try. The output of the Mappers is very large, and therefore they already write it directly to disk. So I'd like to avoid doing this serialization twice (once when the Mapper works, and the second time when Hadoop will *flush* the Reducer's buffer - or whatever the right terminology is).

I apologize if this has been raised before - if it has, could you please point me at the relevant discussion/issue?

Shai
--00c09f7b25efa66c36048c5c7f26--