Return-Path: X-Original-To: apmail-hadoop-hdfs-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 780EC10C6B for ; Thu, 12 Sep 2013 19:38:29 +0000 (UTC) Received: (qmail 22823 invoked by uid 500); 12 Sep 2013 19:02:39 -0000 Delivered-To: apmail-hadoop-hdfs-user-archive@hadoop.apache.org Received: (qmail 22712 invoked by uid 500); 12 Sep 2013 19:02:38 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 22689 invoked by uid 99); 12 Sep 2013 19:02:37 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Sep 2013 19:02:37 +0000 X-ASF-Spam-Status: No, hits=1.7 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of chivas314159@gmail.com designates 209.85.128.45 as permitted sender) Received: from [209.85.128.45] (HELO mail-qe0-f45.google.com) (209.85.128.45) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Sep 2013 19:02:33 +0000 Received: by mail-qe0-f45.google.com with SMTP id 6so193860qea.32 for ; Thu, 12 Sep 2013 12:02:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=coY7vi1kSSJyRmSKnDWl1Wy4kvvrl1HWDnn4oVVLTHo=; b=c8pCIhDUaTNm+gqnkfc3Jz9c9SO1S5yO5D9EtWEnCGTsGFiyqi0rgARRYQ3eW+efFg P2CLNvsA8YEA/yCTx+7HT6Mbc5A2N0HSweHwfPPh/W+MvoTTksbmrIgOQJlD4/kKWKjO yjyJB4sQJyqPEwv4xkKzC/NbH0SID2s+ZH6pmwokT7SOiZejEZYRyDiMRzWtQAIQytS8 rqDuWcXolZI7HKVermsS6F2CBpxCDezlfCfu4osHYAwCMUFyjbciy8T/Edo8BS3f+3MR /7HNiwO5MJDBSuL/7f3vNFtcVDlPTUNU4jSCrrvJ+j5c1id6KktdE0K+v+i21OY9kTrj bsYA== MIME-Version: 1.0 X-Received: by 10.229.116.197 with SMTP id n5mr16444463qcq.22.1379012532873; Thu, 12 Sep 2013 12:02:12 -0700 (PDT) Received: by 10.229.39.130 with HTTP; Thu, 12 Sep 2013 12:02:12 -0700 (PDT) In-Reply-To: References: Date: Thu, 12 Sep 2013 20:02:12 +0100 Message-ID: Subject: Re: chaining (the output of) jobs/ reducers From: Adrian CAPDEFIER To: user@hadoop.apache.org Content-Type: multipart/alternative; boundary=001a1133063e29e17f04e63460a8 X-Virus-Checked: Checked by ClamAV on apache.org --001a1133063e29e17f04e63460a8 Content-Type: text/plain; charset=ISO-8859-1 Thanks Bryan. Yes, I am using hadoop + hdfs. If I understand your point, hadoop tries to start the mapping processes on nodes where the data is local and if that's not possible, then it is hdfs that replicates the data to the mapper nodes? I expected to have to set up this in the code and I completely ignored HDFS; I guess it's a case of not seeing the forest from all the trees! On Thu, Sep 12, 2013 at 6:38 PM, Bryan Beaudreault wrote: > It really comes down to the following: > > In Job A set mapred.output.dir to some directory X. > In Job B set mapred.input.dir to the same directory X. > > For Job A, do context.write() as normally, and each reducer will create an > output file in mapred.output.dir. Then in Job B each of those will > correspond to a mapper. > > Of course you need to make sure your input and output formats, as well as > input and output keys/values, match up between the two jobs as well. > > If you are using HDFS, which it seems you are, the directories specified > can be HDFS directories. In that case, with a replication factor of 3, > each of these output files will exist on 3 nodes. Hadoop and HDFS will do > the work to ensure that the mappers in the second job do as good a job as > possible to be data or rack-local. > > > On Thu, Sep 12, 2013 at 12:35 PM, Adrian CAPDEFIER > wrote: > >> Thank you, Chris. I will look at Cascading and Pig, but for starters I'd >> prefer to keep, if possible, everything as close to the hadoop libraries. >> >> I am sure I am overlooking something basic as repartitioning is a fairly >> common operation in MPP environments. >> >> >> On Thu, Sep 12, 2013 at 2:39 PM, Chris Curtin wrote: >> >>> If you want to stay in Java look at Cascading. Pig is also helpful. I >>> think there are other (Spring integration maybe?) but I'm not familiar with >>> them enough to make a recommendation. >>> >>> Note that with Cascading and Pig you don't write 'map reduce' you write >>> logic and they map it to the various mapper/reducer steps automatically. >>> >>> Hope this helps, >>> >>> Chris >>> >>> >>> On Thu, Sep 12, 2013 at 9:36 AM, Adrian CAPDEFIER < >>> chivas314159@gmail.com> wrote: >>> >>>> Howdy, >>>> >>>> My application requires 2 distinct processing steps (reducers) to be >>>> performed on the input data. The first operation generates changes the key >>>> values and, records that had different keys in step 1 can end up having the >>>> same key in step 2. >>>> >>>> The heavy lifting of the operation is in step1 and step2 only combines >>>> records where keys were changed. >>>> >>>> In short the overview is: >>>> Sequential file -> Step 1 -> Step 2 -> Output. >>>> >>>> >>>> To implement this in hadoop, it seems that I need to create a separate >>>> job for each step. >>>> >>>> Now I assumed, there would some sort of job management under hadoop to >>>> link Job 1 and 2, but the only thing I could find was related to job >>>> scheduling and nothing on how to synchronize the input/output of the linked >>>> jobs. >>>> >>>> >>>> >>>> The only crude solution that I can think of is to use a temporary file >>>> under HDFS, but even so I'm not sure if this will work. >>>> >>>> The overview of the process would be: >>>> Sequential Input (lines) => Job A[Mapper (key1, value1) => ChainReducer >>>> (key2, value2)] => Temporary file => Job B[Mapper (key2, value2) => Reducer >>>> (key2, value 3)] => output. >>>> >>>> Is there a better way to pass the output from Job A as input to Job B >>>> (e.g. using network streams or some built in java classes that don't do >>>> disk i/o)? >>>> >>>> >>>> >>>> The temporary file solution will work in a single node configuration, >>>> but I'm not sure about an MPP config. >>>> >>>> Let's say Job A runs on nodes 0 and 1 and job B runs on nodes 2 and 3 >>>> or both jobs run on all 4 nodes - will HDFS be able to redistribute >>>> automagically the records between nodes or does this need to be coded >>>> somehow? >>>> >>> >>> >> > --001a1133063e29e17f04e63460a8 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
Thanks Bryan.

Y= es, I am using hadoop + hdfs.

If I understand your point, hadoop tri= es to start the mapping processes on nodes where the data is local and if t= hat's not possible, then it is hdfs that replicates the data to the map= per nodes?

I expected to have to set up this in the code and I completely ignored = HDFS; I guess it's a case of not seeing the forest from all the trees!<= br>


On Thu, Sep 12, 2013 at 6:38 PM, Bryan Beaudreault <bbeaudreault@hu= bspot.com> wrote:
It really comes down to the following:

= In Job A set mapred.output.dir to some directory X.
In Job B set = mapred.input.dir to the same directory X.

For Job = A, do context.write() as normally, and each reducer will create an output f= ile in mapred.output.dir. =A0Then in Job B each of those will correspond to= a mapper.

Of course you need to make sure your input and output f= ormats, as well as input and output keys/values, match up between the two j= obs as well.

If you are using HDFS, which it seems= you are, the directories specified can be HDFS directories. =A0In that cas= e, with a replication factor of 3, each of these output files will exist on= 3 nodes. =A0Hadoop and HDFS will do the work to ensure that the mappers in= the second job do as good a job as possible to be data or rack-local.


On Thu, Sep 12, 2013 at 12:35 PM, Adrian CAPDEFIER <chivas= 314159@gmail.com> wrote:
Thank you, Chris. I will look at Cascading and Pi= g, but for starters I'd prefer to keep, if possible, everything as clos= e to the hadoop libraries.

I am sure I am overlooking something basic as repartitioning is a= fairly common operation in MPP environments.


On Thu, Sep 12, 2013 at 2:39 PM, Chris Curtin <curtin.chris@gmail.= com> wrote:
If you w= ant to stay in Java look at Cascading. Pig is also helpful. I think there a= re other (Spring integration maybe?) but I'm not familiar with them eno= ugh to make a recommendation.

Note that with Cascading and Pig you don't write 'map reduce' y= ou write logic and they map it to the various mapper/reducer steps automati= cally.

Hope this helps,

Chris


On Thu, Sep 12, 2013 at 9:36 AM, Adrian CAPDEFIER <ch= ivas314159@gmail.com> wrote:
Howdy,

My application requires 2 distinct processing st= eps (reducers) to be performed on the input data. The first operation gener= ates changes the key values and, records that had different keys in step 1 = can end up having the same key in step 2.

The heavy lifting of the operation is in step1 and step2 onl= y combines records where keys were changed.

In= short the overview is:
Sequential file -> Step 1 -> St= ep 2 -> Output.


To implement this in hadoop, it seems that I need to cre= ate a separate job for each step.

Now I assumed, there would some s= ort of job management under hadoop to=20 link Job 1 and 2, but the only thing I could find was related to job=20 scheduling and nothing on how to synchronize the input/output of the linked= jobs.



The only crude solution that I can think of is to use= a temporary file under HDFS, but even so I'm not sure if this will wor= k.

The overview of the process would be:
Sequential Input (l= ines) =3D> Job A[Mapper (key1, value1) =3D> ChainReducer (key2, value= 2)] =3D> Temporary file =3D> Job B[Mapper (key2, value2) =3D> Redu= cer (key2, value 3)] =3D> output.

Is there a better way to pass the output from Job A as= input to Job B (e.g. using network streams or some built in java classes t= hat don't do disk i/o)?



The temporary file solution wil= l work in a single node configuration, but I'm not sure about an MPP co= nfig.

Let's say Job A runs on nodes 0 and 1 and job B runs on = nodes 2 and 3 or both jobs run on all 4 nodes - will HDFS be able to redist= ribute automagically the records between nodes or does this need to be code= d somehow?




--001a1133063e29e17f04e63460a8--