Return-Path: X-Original-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B3D8310698 for ; Sun, 2 Mar 2014 08:47:47 +0000 (UTC) Received: (qmail 19299 invoked by uid 500); 2 Mar 2014 08:47:40 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 19117 invoked by uid 500); 2 Mar 2014 08:47:38 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 19109 invoked by uid 99); 2 Mar 2014 08:47:36 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Mar 2014 08:47:36 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of raofengyun@gmail.com designates 209.85.220.181 as permitted sender) Received: from [209.85.220.181] (HELO mail-vc0-f181.google.com) (209.85.220.181) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Mar 2014 08:47:32 +0000 Received: by mail-vc0-f181.google.com with SMTP id lg15so2418958vcb.26 for ; Sun, 02 Mar 2014 00:47:11 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=OnJ5A1k5ANI9ytJkoDWbCykW8sLGRxQWsqxe1oByLUc=; b=WskOy33ZENqIQEnWm8RwQTcYiYWXulApb0BgfMDB1TsWY833S6m57ZUWarIgban5UA c+M6vijqcnRxcXN2WkuLht2HaZuXXDInC9828wsUfLDZVnIYT1Jis4p6n4X5NsqC02If s0RUKdW6WGJ5Lfr11NKgmmt6lvLrZgrg+tFMGQDlYzJ1zsKiE1TIbp3PijvWB8HL7PBH 5Wa0LOavDZB4HKRUUN3PD7HrZ8Fy8+MW/jo7MOg7FcaII28ubtj9RvkH1X14glyyZ2uO PAo5ljznMZKdJk/DUyX7x1FaXgWbshB7Y6WpNG9ZKg82IKc2dpD+s1oxlvUjdwRE+QcU K7Ng== MIME-Version: 1.0 X-Received: by 10.220.53.66 with SMTP id l2mr800295vcg.33.1393750031460; Sun, 02 Mar 2014 00:47:11 -0800 (PST) Received: by 10.220.232.68 with HTTP; Sun, 2 Mar 2014 00:47:11 -0800 (PST) In-Reply-To: References: Date: Sun, 2 Mar 2014 16:47:11 +0800 Message-ID: Subject: Re: Map-Reduce: How to make MR output one file an hour? From: Fengyun RAO To: user@hadoop.apache.org Cc: shekhar2581@gmail.com Content-Type: multipart/alternative; boundary=001a11c2b8da87ed6904f39bb774 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c2b8da87ed6904f39bb774 Content-Type: text/plain; charset=ISO-8859-1 thanks, Shekhar. I'm unfamiliar with Flume, but I will look into it later 2014-03-02 15:36 GMT+08:00 Shekhar Sharma : > Don't you think using flume would be easier. Use hdfs sink and use a > property to roll out the log file every hour. > By doing this way you use a single flume agent to receive logs as and when > it is generating and you will be directly dumping to hdfs. > If you want to remove unwanted logs you can write a custom sink before > dumping to hdfs > > I suppose this would he much easier > On 2 Mar 2014 12:34, "Fengyun RAO" wrote: > >> Thanks, Simon. that's very clear. >> >> >> 2014-03-02 14:53 GMT+08:00 Simon Dong : >> >>> Reading data for each hour shouldn't be a problem, as for Hadoop or >>> shell you can pretty much do everything with mmddhh* as you can do with >>> mmddhh. >>> >>> But if you need the data for the hour all sorted in one file then you >>> have to run a post processing MR job for each hour's data to merge them, >>> which should be very trivial. >>> >>> With that being a requirement, using a custom partitioner to send all >>> records with in an hour to a particular reducer might be a viable or better >>> option to save the additional MR pass to merge them, given: >>> >>> -You can determine programatically before submitting the job the number >>> of hours covered, then you can call job.setNumOfReduceTasks(numOfHours) to >>> set the number of reducers >>> -The number of hours you cover for each run matches the number of >>> reducers your cluster typically assigns so you won't suffer much >>> efficiency. For example if each run covers last 24 hours and your cluster >>> defaults to 18 reducer slots, it should be fine >>> -You can emit timestamp as the key from the mapper so your partitioner >>> can decide which reducer the record should be send to, and it will be >>> sorted by MR when it reaches the reducer >>> >>> Even with this, you can still use MultipleOutputs to customize the file >>> name each reducer generates for better usability, i.e. instead of >>> part-r-0000x have it generate mmddhh-r-00000. >>> >>> -Simon >>> >>> On Sat, Mar 1, 2014 at 10:13 PM, Fengyun RAO wrote: >>> >>>> Thank you, Simon! It helps a lot! >>>> >>>> We want one file per hour for the reason of following query. >>>> It would be very convenient to select several specified hours' results. >>>> >>>> We also need each record sorted by timestamp, for following processing. >>>> With a set of files for an hour, as you show in MultipleOutputs, we >>>> would have to merge sort them later. maybe need another MR job? >>>> >>>> 2014-03-02 13:14 GMT+08:00 Simon Dong : >>>> >>>> Fengyun, >>>>> >>>>> Is there any particular reason you have to have exactly 1 file per >>>>> hour? As you probably knew already, each reducer will output 1 file, or if >>>>> you use MultipleOutputs as I suggested, a set of files. If you have to fit >>>>> the number of reducers to the number hours you have from the input, and >>>>> generate the number of files accordingly, it will most likely be at the >>>>> expense of cluster efficiency and performance. A worst case scenario of >>>>> course is if you have a bunch of data all within the same hour, then you >>>>> have to settle with 1 reducer without any parallelization at all. >>>>> >>>>> A workaround is to use MultipleOutputs to generate a set of files for >>>>> each hour, with the hour being a the base name. Or if you so choose, a >>>>> sub-directory for each hour. For example if you use mmddhh as the base >>>>> name, you will have a set of files for an hour like: >>>>> >>>>> 030119-r-00000 >>>>> ... >>>>> 030119-r-0000n >>>>> 030120-r-00000 >>>>> ... >>>>> 030120-r-0000n >>>>> >>>>> Or in a sub-directory: >>>>> >>>>> 030119/part-r-00000 >>>>> ... >>>>> 030119/part-r-0000n >>>>> >>>>> You can then use wild card to glob the output either for manual >>>>> processing, or as input path for subsequent jobs. >>>>> >>>>> -Simon >>>>> >>>>> >>>>> >>>>> On Sat, Mar 1, 2014 at 7:37 PM, Fengyun RAO wrote: >>>>> >>>>>> Thanks Devin. We don't just want one file. It's complicated. >>>>>> >>>>>> if the input folder contains data in X hours, we want X files, >>>>>> if Y hours, we want Y files. >>>>>> >>>>>> obviously, X or Y is unknown on compile time. >>>>>> >>>>>> 2014-03-01 20:48 GMT+08:00 Devin Suiter RDX : >>>>>> >>>>>>> If you only want one file, then you need to set the number of >>>>>>> reducers to 1. >>>>>>> >>>>>>> If the size of the data makes the original MR job impractical to use >>>>>>> a single reducer, you run a second job on the output of the first, with the >>>>>>> default mapper and reducer, which are the Identity- ones, and set that >>>>>>> numReducers = 1. >>>>>>> >>>>>>> Or use hdfs getmerge function to collate the results to one file. >>>>>>> On Mar 1, 2014 4:59 AM, "Fengyun RAO" wrote: >>>>>>> >>>>>>>> Thanks, but how to set reducer number to X? X is dependent on input >>>>>>>> (run-time), which is unknown on job configuration (compile time). >>>>>>>> >>>>>>>> >>>>>>>> 2014-03-01 17:44 GMT+08:00 AnilKumar B : >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Write the custom partitioner on and as you mentioned >>>>>>>>> set #reducers to X. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> >> --001a11c2b8da87ed6904f39bb774 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
thanks, Shekhar. I'm unfamiliar with Flume, but I will= look into it later


2014-03-02 15:36 GMT+08:00 Shekhar Sharma <= ;shekhar2581@gma= il.com>:

Don't you think using flu= me would be easier. Use hdfs sink and use a property to roll out the log fi= le every hour.
By doing this way you use a single flume agent to receive logs as and when = it is generating and you will be directly dumping to hdfs.
If you want to remove unwanted logs you can write a custom sink before dump= ing to hdfs

I suppose this would he much easier

On 2 Mar 2014 12:34, "Fengyun RAO" <= ;raofengyun@gmail= .com> wrote:
Thanks, Simon. that's very clear.


2014-03-02 14:53 GMT+08:00 Simon D= ong <simond301@gmail.com>:
Reading data for each = hour shouldn't be a problem, as for Hadoop or shell you can pretty much= do everything with mmddhh* as you can do with mmddhh.

But if you need the data for the hour all sorted in one file= then you have to run a post processing MR job for each hour's data to = merge them, which should be very trivial.

With that being a requirement, using a custom partitioner to= send all records with in an hour to a particular reducer might be a viable= or better option to save the additional MR pass to merge them, given:

-You can determine programatically before submitting th= e job the number of hours covered, then you can call job.setNumOfReduceTask= s(numOfHours) to set the number of reducers
-The number of hours = you cover for each run matches the number of reducers your cluster typicall= y assigns so you won't suffer much efficiency. For example if each run = covers last 24 hours and your cluster defaults to 18 reducer slots, it shou= ld be fine
-You can emit timestamp as the key from the mapper so your partitioner= can decide which reducer the record should be send to, and it will be sort= ed by MR when it reaches the reducer

Even with thi= s, you can still use MultipleOutputs to customize the file name each reduce= r generates for better usability, i.e. instead of part-r-0000x have it gene= rate mmddhh-r-00000.

-Simon

On Sat, Mar 1, 2014 at 10:13 PM, Fengyun RAO <<= a href=3D"mailto:raofengyun@gmail.com" target=3D"_blank">raofengyun@gmail.c= om> wrote:
Thank you, Simon! It helps = a lot!

We want one file per hour for the reason of follo= wing query.=A0
It would be very convenient to select several specified hours' res= ults.

We also need each record sorted by tim= estamp, for following processing.
With a se= t of files for an hour, as you show in MultipleOutputs, we would have to me= rge sort them later. maybe need another MR job?

2014-03-02 13:14 GMT+08:00 Simon Dong <si= mond301@gmail.com>:

Fengyun,

Is there any particular reason= you have to have exactly 1 file per hour? As you probably knew already, ea= ch reducer will output 1 file, or if you use MultipleOutputs as I suggested= , a set of files. If you have to fit the number of reducers to the number h= ours you have from the input, and generate the number of files accordingly,= it will most likely be at the expense of cluster efficiency and performanc= e. A worst case scenario of course is if you have a bunch of data all withi= n the same hour, then you have to settle with 1 reducer without any paralle= lization at all.

A workaround is to use MultipleOutputs to generate a se= t of files for each hour, with the hour being a the base name. Or if you so= choose, a sub-directory for each hour. For example if you use mmddhh as th= e base name, you will have a set of files for an hour like:

030119-r-00000
...
030119-r-0000n
030120-r-00000
...
030120-r-0000n

Or in a sub-directory:

030119/part-r-00000
...
030119/part-r-0000n
<= br>
You can then use wild card to glob the output either for manu= al processing, or as input path for subsequent jobs.

-Simon



On Sat, Mar 1, 2014 at 7:3= 7 PM, Fengyun RAO <raofengyun@gmail.com> wrote:
Thanks Devin. We don't = just want one file. It's complicated.

if the input f= older contains data in X hours, we want X files,
if Y hours, we want Y files.

obviously, X or Y is unknown on compile time.

2014-03-01 20:48 GMT+08:00 Devin Suite= r RDX <dsuiter@rdx.com>:

If you only want one file, th= en you need to set the number of reducers to 1.

If the size of the data makes the original MR job impractica= l to use a single reducer, you run a second job on the output of the first,= with the default mapper and reducer, which are the Identity- ones, and set= that numReducers =3D 1.

Or use hdfs getmerge function to collate the results to one = file.

On Mar 1, 2014 4:59 AM, "Fengyun RAO" = <raofengyun@gm= ail.com> wrote:
Thanks, but how to set reducer number to X? X is dependent= on input (run-time), which is unknown on job configuration (compile time).=


2014-03= -01 17:44 GMT+08:00 AnilKumar B <akumarb2010@gmail.com>:=
Hi,

Write= the custom partitioner on <timestamp> and as you mentioned set #redu= cers to X.









--001a11c2b8da87ed6904f39bb774--