Return-Path: Delivered-To: apmail-hive-user-archive@www.apache.org Received: (qmail 18867 invoked from network); 20 Nov 2010 01:20:26 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 20 Nov 2010 01:20:26 -0000 Received: (qmail 63042 invoked by uid 500); 20 Nov 2010 01:20:57 -0000 Delivered-To: apmail-hive-user-archive@hive.apache.org Received: (qmail 62963 invoked by uid 500); 20 Nov 2010 01:20:57 -0000 Mailing-List: contact user-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hive.apache.org Delivered-To: mailing list user@hive.apache.org Received: (qmail 62955 invoked by uid 99); 20 Nov 2010 01:20:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Nov 2010 01:20:57 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=10.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of nzhang@fb.com designates 66.220.144.135 as permitted sender) Received: from [66.220.144.135] (HELO mx-out.facebook.com) (66.220.144.135) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Nov 2010 01:20:53 +0000 Received: from [192.168.18.212] ([192.168.18.212:1188] helo=mail.thefacebook.com) by mta014.snc4.facebook.com (envelope-from ) (ecelerity 2.2.2.45 r(37388)) with ESMTP id C9/0A-03360-06227EC4; Fri, 19 Nov 2010 17:20:32 -0800 Received: from SC-MBX04.TheFacebook.com ([169.254.3.91]) by sc-hub04.TheFacebook.com ([192.168.18.212]) with mapi id 14.01.0218.012; Fri, 19 Nov 2010 17:20:32 -0800 From: Ning Zhang To: "" Subject: Re: Hive produces very small files despite hive.merge...=true settings Thread-Topic: Hive produces very small files despite hive.merge...=true settings Thread-Index: AQHLhsR4vBxZqNnJEUmo+7rhBAgU+ZN4Q0qAgAAdJ4CAAA1CAIAARBqAgACu3ACAAEVsAIAACAGAgAAqOoCAAEKCgA== Date: Sat, 20 Nov 2010 01:20:31 +0000 Message-ID: References: <727149DB-5FAA-47F7-A74C-2DCE0E03CFB1@fb.com> <1DFCAC84-F860-4247-A6AE-6EF24DD75680@fb.com> In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-originating-ip: [192.168.18.252] Content-Type: text/plain; charset="us-ascii" Content-ID: Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 It makes sense. CombineHiveInputFormat does not work with compressed text f= iles (suffix *.gz) since it is not splittable. I think your default hive.fi= le.format=3DCombineHiveInputFormat. But I think by setting hive.merge.mapon= ly it should work (meaning merge should be succeeded). By setting hive.merg= e.maponly, you'll have multiple mappers (the same # of small files) and 1 r= educer. The reducer's output should be the merged result.=20 On Nov 19, 2010, at 1:22 PM, Leo Alekseyev wrote: > Folks, thanks for your help. I've narrowed the problem down to > compression. When I set hive.exec.compress.output=3Dfalse, merges > proceed as expected. When compression is on, the merge job doesn't > seem to actually merge, it just spits out the input. >=20 > On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he = wrote: >> These are the parameters that control the behavior. (Try to set them >> to different values if it does not work in your environment.) >>=20 >> set hive.input.format=3Dorg.apache.hadoop.hive.ql.io.CombineHiveInputFor= mat; >> set mapred.min.split.size.per.node=3D1000000000; >> set mapred.min.split.size.per.rack=3D1000000000; >> set mapred.max.split.size=3D1000000000; >>=20 >> set hive.merge.size.per.task=3D1000000000; >> set hive.merge.smallfiles.avgsize=3D1000000000; >> set hive.merge.size.smallfiles.avgsize=3D1000000000; >> set hive.exec.dynamic.partition.mode=3Dnonstrict; >>=20 >>=20 >> The output size of the second job is also controlled by the split >> size, as shown in the first 4 lines. >>=20 >>=20 >> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev wrot= e: >>> I'm using Hadoop 0.20.2. Merge jobs (with static partitions) have >>> worked for me in the past. Again, what's strange here is with the >>> latest Hive build the merge stage appears to run, but it doesn't >>> actually merge -- it's a quick map-only job that, near as I can tell, >>> doesn't do anything. >>>=20 >>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema w= rote: >>>> What version of Hadoop are you on? >>>>=20 >>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev wr= ote: >>>>>=20 >>>>> I thought I was running Hive with those changes merged in, but to mak= e >>>>> sure, I built the latest trunk version. The behavior changed somewha= t >>>>> (as in, it runs 2 stages instead of 1), but it still generates the >>>>> same number of files (# of files generated is equal to the number of >>>>> the original mappers, so I have no idea what the second stage is >>>>> actually doing). >>>>>=20 >>>>> See below for query / explain query. Stage 1 runs always; Stage 3 >>>>> runs if hive.merge.mapfiles=3Dtrue is set, but it still generates lot= s >>>>> of small files. >>>>>=20 >>>>> The query is kind of large, but in essence it's simply >>>>> insert overwrite table foo partition(bar) select [columns] from >>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where >>>>> [conditions]. >>>>>=20 >>>>>=20 >>>>> explain insert overwrite table hbase_prefilter3_us_sample partition >>>>> (ds) select >>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extrac= t(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST= '),user_agent,cookie,geoip_int(client_ip, >>>>> 'COUNTRY_CODE', './GeoIP.dat'),'',ds from alogs_master >>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where >>>>> am_s.ds=3D'2010-11-05' and am_s.request_url rlike >>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and >>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE', './GeoIP.dat')=3D'US'; >>>>> OK >>>>> ABSTRACT SYNTAX TREE: >>>>> (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1 >>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION >>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds)))) >>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR >>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL >>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL >>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL >>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR >>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url >>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL >>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR >>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE' >>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds))) >>>>> (TOK_WHERE (and (and (=3D (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05'= ) >>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url) >>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (=3D (TOK_FUNCTI= ON >>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE' >>>>> './GeoIP.dat') 'US'))))) >>>>>=20 >>>>> STAGE DEPENDENCIES: >>>>> Stage-1 is a root stage >>>>> Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3 >>>>> Stage-4 >>>>> Stage-0 depends on stages: Stage-4, Stage-3 >>>>> Stage-2 depends on stages: Stage-0 >>>>> Stage-3 >>>>>=20 >>>>> STAGE PLANS: >>>>> Stage: Stage-1 >>>>> Map Reduce >>>>> Alias -> Map Operator Tree: >>>>> am_s >>>>> TableScan >>>>> alias: am_s >>>>> Filter Operator >>>>> predicate: >>>>> expr: (((hash(rand()) & 2147483647) % 10000) =3D 0) >>>>> type: boolean >>>>> Filter Operator >>>>> predicate: >>>>> expr: ((request_url rlike >>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and >>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) =3D 'US= ')) >>>>> type: boolean >>>>> Filter Operator >>>>> predicate: >>>>> expr: (((ds =3D '2010-11-05') and (request_url >>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and >>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) =3D 'US= ')) >>>>> type: boolean >>>>> Select Operator >>>>> expressions: >>>>> expr: server_host >>>>> type: string >>>>> expr: client_ip >>>>> type: int >>>>> expr: time_stamp >>>>> type: int >>>>> expr: concat(server_host, ':', >>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1)) >>>>> type: string >>>>> expr: referrer >>>>> type: string >>>>> expr: parse_url(referrer, 'HOST') >>>>> type: string >>>>> expr: user_agent >>>>> type: string >>>>> expr: cookie >>>>> type: string >>>>> expr: GenericUDFGeoIP ( client_ip, >>>>> 'COUNTRY_CODE', './GeoIP.dat' ) >>>>> type: string >>>>> expr: '' >>>>> type: string >>>>> expr: ds >>>>> type: string >>>>> outputColumnNames: _col0, _col1, _col2, _col3, >>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10 >>>>> File Output Operator >>>>> compressed: true >>>>> GlobalTableId: 1 >>>>> table: >>>>> input format: >>>>> org.apache.hadoop.mapred.TextInputFormat >>>>> output format: >>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>>> serde: >>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe >>>>> name: hbase_prefilter3_us_sample >>>>>=20 >>>>> Stage: Stage-5 >>>>> Conditional Operator >>>>>=20 >>>>> Stage: Stage-4 >>>>> Move Operator >>>>> files: >>>>> hdfs directory: true >>>>> destination: >>>>>=20 >>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17= -58-36_843_6726655151866456030/-ext-10000 >>>>>=20 >>>>> Stage: Stage-0 >>>>> Move Operator >>>>> tables: >>>>> partition: >>>>> ds >>>>> replace: true >>>>> table: >>>>> input format: org.apache.hadoop.mapred.TextInputFormat >>>>> output format: >>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>>> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerD= e >>>>> name: hbase_prefilter3_us_sample >>>>>=20 >>>>> Stage: Stage-2 >>>>> Stats-Aggr Operator >>>>>=20 >>>>> Stage: Stage-3 >>>>> Map Reduce >>>>> Alias -> Map Operator Tree: >>>>>=20 >>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_1= 7-58-36_843_6726655151866456030/-ext-10002 >>>>> File Output Operator >>>>> compressed: true >>>>> GlobalTableId: 0 >>>>> table: >>>>> input format: org.apache.hadoop.mapred.TextInputForm= at >>>>> output format: >>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>>> serde: org.apache.hadoop.hive.serde2.lazy.LazySimple= SerDe >>>>> name: hbase_prefilter3_us_sample >>>>>=20 >>>>>=20 >>>>>=20 >>>>>=20 >>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang wrote: >>>>>> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 = need >>>>>> to be there for merging to take place. HIVE-1307 was committed to tr= unk on >>>>>> 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to u= pdate >>>>>> your Hive trunk and rerun the query. If it still doesn't work maybe = you can >>>>>> post your query and the result of 'explain ' and we can take = a look. >>>>>>=20 >>>>>> Ning >>>>>>=20 >>>>>> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote: >>>>>>=20 >>>>>>> Hi Ning, >>>>>>> For the dataset I'm experimenting with, the total size of the outpu= t >>>>>>> is 2mb, and the files are at most a few kb in size. My >>>>>>> hive.input.format was set to default HiveInputFormat; however, when= I >>>>>>> set it to CombineHiveInputFormat, it only made the first stage of t= he >>>>>>> job use fewer mappers. The merge job was *still* filtered out at >>>>>>> runtime. I also tried set hive.mergejob.maponly=3Dfalse; that didn= 't >>>>>>> have any effect. >>>>>>>=20 >>>>>>> I am a bit at a loss what to do here. Is there a way to see what's >>>>>>> going on exactly using e.g. debug log levels?.. Btw, I'm also usin= g >>>>>>> dynamic partitions; could that somehow be interfering with the merg= e >>>>>>> job?.. >>>>>>>=20 >>>>>>> I'm running a relatively fresh Hive from trunk (built maybe a month >>>>>>> ago). >>>>>>>=20 >>>>>>> --Leo >>>>>>>=20 >>>>>>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang wrote: >>>>>>>> The settings looks good. The parameter >>>>>>>> hive.merge.size.smallfiles.avgsize is used to determine at run tim= e if a >>>>>>>> merge should be triggered: if the average size of the files in the= partition >>>>>>>> is SMALLER than the parameter and there are more than 1 file, the = merge >>>>>>>> should be scheduled. Can you try to see if you have any big files = as well in >>>>>>>> your resulting partition? If it is because of a very large file, y= ou can set >>>>>>>> the parameter large enough. >>>>>>>>=20 >>>>>>>> Another possibility is that your Hadoop installation does not supp= ort >>>>>>>> CombineHiveInputFormat, which is used for the new merge job. Someo= ne >>>>>>>> reported previously merge was not successful because of this. If t= hat's the >>>>>>>> case, you can turn off CombineHiveInputFormat and use the old >>>>>>>> HiveInputFormat (though slower) by setting hive.mergejob.maponly= =3Dfalse. >>>>>>>>=20 >>>>>>>> Ning >>>>>>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote: >>>>>>>>=20 >>>>>>>>> I have jobs that sample (or generate) a small amount of data from= a >>>>>>>>> large table. At the end, I get e.g. about 3000 or more files of = 1kb >>>>>>>>> or so. This becomes a nuisance. How can I make Hive do another = pass >>>>>>>>> to merge the output? I have the following settings: >>>>>>>>>=20 >>>>>>>>> hive.merge.mapfiles=3Dtrue >>>>>>>>> hive.merge.mapredfiles=3Dtrue >>>>>>>>> hive.merge.size.per.task=3D256000000 >>>>>>>>> hive.merge.size.smallfiles.avgsize=3D16000000 >>>>>>>>>=20 >>>>>>>>> After setting hive.merge* to true, Hive started indicating "Total >>>>>>>>> MapReduce jobs =3D 2". However, after generating the >>>>>>>>> lots-of-small-files table, Hive says: >>>>>>>>> Ended Job =3D job_201011021934_1344 >>>>>>>>> Ended Job =3D 781771542, job is filtered out (removed at runtime)= . >>>>>>>>>=20 >>>>>>>>> Is there a way to force the merge, or am I missing something? >>>>>>>>> --Leo >>>>>>>>=20 >>>>>>>>=20 >>>>>>=20 >>>>>>=20 >>>>=20 >>>>=20 >>>>=20 >>>> -- >>>> Dave Brondsema >>>> Software Engineer >>>> Geeknet >>>>=20 >>>> www.geek.net >>>>=20 >>>=20 >>=20