hive-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leo Alekseyev <dnqu...@gmail.com>
Subject Re: Hive produces very small files despite hive.merge...=true settings
Date Tue, 23 Nov 2010 06:05:15 GMT
I found another criterion that determines whether or not the merge job
runs with compression turned on.  It seems that if the target table is
stored as an rcfile, merges work, but if a text file, merges will
fail.  For instance:

-- merge will work here:
create table  alogs_dbg_sample3 (server_host STRING, client_ip INT,
time_stamp INT) row format serde
'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' stored as
rcfile;
insert overwrite table alogs_dbg_sample3 select
server_host,client_ip,time_stamp from alogs_dbg_subset
TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;

-- merge will fail here:
create table alogs_dbg_sample2 (server_host STRING, client_ip INT,
time_stamp INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES
TERMINATED BY '\n' STORED AS TEXTFILE;
insert overwrite table alogs_dbg_sample2 select
server_host,client_ip,time_stamp from alogs_dbg_subset
TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;

Is this a bug?..  I don't see why merge job running should be
sensitive to the output table format.
P.S:  I have hive.merge.maponly=true and am using LZO compression

On Fri, Nov 19, 2010 at 5:20 PM, Ning Zhang <nzhang@fb.com> wrote:
> It makes sense. CombineHiveInputFormat does not work with compressed text files (suffix
*.gz) since it is not splittable. I think your default hive.file.format=CombineHiveInputFormat.
But I think by setting hive.merge.maponly it should work (meaning merge should be succeeded).
By setting hive.merge.maponly, you'll have multiple mappers (the same # of small files) and
1 reducer. The reducer's output should be the merged result.
>
> On Nov 19, 2010, at 1:22 PM, Leo Alekseyev wrote:
>
>> Folks, thanks for your help.  I've narrowed the problem down to
>> compression.  When I set hive.exec.compress.output=false, merges
>> proceed as expected.  When compression is on, the merge job doesn't
>> seem to actually merge, it just spits out the input.
>>
>> On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <heyongqiangict@gmail.com> wrote:
>>> These are the parameters that control the behavior. (Try to set them
>>> to different values if it does not work in your environment.)
>>>
>>> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
>>> set mapred.min.split.size.per.node=1000000000;
>>> set mapred.min.split.size.per.rack=1000000000;
>>> set mapred.max.split.size=1000000000;
>>>
>>> set hive.merge.size.per.task=1000000000;
>>> set hive.merge.smallfiles.avgsize=1000000000;
>>> set hive.merge.size.smallfiles.avgsize=1000000000;
>>> set hive.exec.dynamic.partition.mode=nonstrict;
>>>
>>>
>>> The output size of the second job is also controlled by the split
>>> size, as shown in the first 4 lines.
>>>
>>>
>>> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dnquark@gmail.com> wrote:
>>>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>>>> worked for me in the past.  Again, what's strange here is with the
>>>> latest Hive build the merge stage appears to run, but it doesn't
>>>> actually merge -- it's a quick map-only job that, near as I can tell,
>>>> doesn't do anything.
>>>>
>>>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <dbrondsema@geek.net>
wrote:
>>>>> What version of Hadoop are you on?
>>>>>
>>>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dnquark@gmail.com>
wrote:
>>>>>>
>>>>>> I thought I was running Hive with those changes merged in, but to
make
>>>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>>>> same number of files (# of files generated is equal to the number
of
>>>>>> the original mappers, so I have no idea what the second stage is
>>>>>> actually doing).
>>>>>>
>>>>>> See below for query / explain query.  Stage 1 runs always; Stage
3
>>>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>>>> of small files.
>>>>>>
>>>>>> The query is kind of large, but in essence it's simply
>>>>>> insert overwrite table foo partition(bar) select [columns] from
>>>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>>>> [conditions].
>>>>>>
>>>>>>
>>>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>>>> (ds) select
>>>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>>>> OK
>>>>>> ABSTRACT SYNTAX TREE:
>>>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE
1
>>>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>>>>> './GeoIP.dat') 'US')))))
>>>>>>
>>>>>> STAGE DEPENDENCIES:
>>>>>>  Stage-1 is a root stage
>>>>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>>>>  Stage-4
>>>>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>>>>  Stage-2 depends on stages: Stage-0
>>>>>>  Stage-3
>>>>>>
>>>>>> STAGE PLANS:
>>>>>>  Stage: Stage-1
>>>>>>    Map Reduce
>>>>>>      Alias -> Map Operator Tree:
>>>>>>        am_s
>>>>>>          TableScan
>>>>>>            alias: am_s
>>>>>>            Filter Operator
>>>>>>              predicate:
>>>>>>                  expr: (((hash(rand()) & 2147483647)
% 10000) = 0)
>>>>>>                  type: boolean
>>>>>>              Filter Operator
>>>>>>                predicate:
>>>>>>                    expr: ((request_url rlike
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>                    type: boolean
>>>>>>                Filter Operator
>>>>>>                  predicate:
>>>>>>                      expr: (((ds = '2010-11-05') and (request_url
>>>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>                      type: boolean
>>>>>>                  Select Operator
>>>>>>                    expressions:
>>>>>>                          expr: server_host
>>>>>>                          type: string
>>>>>>                          expr: client_ip
>>>>>>                          type: int
>>>>>>                          expr: time_stamp
>>>>>>                          type: int
>>>>>>                          expr: concat(server_host, ':',
>>>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>>>>                          type: string
>>>>>>                          expr: referrer
>>>>>>                          type: string
>>>>>>                          expr: parse_url(referrer, 'HOST')
>>>>>>                          type: string
>>>>>>                          expr: user_agent
>>>>>>                          type: string
>>>>>>                          expr: cookie
>>>>>>                          type: string
>>>>>>                          expr: GenericUDFGeoIP ( client_ip,
>>>>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>>>>                          type: string
>>>>>>                          expr: ''
>>>>>>                          type: string
>>>>>>                          expr: ds
>>>>>>                          type: string
>>>>>>                    outputColumnNames: _col0, _col1, _col2,
_col3,
>>>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>>>>                    File Output Operator
>>>>>>                      compressed: true
>>>>>>                      GlobalTableId: 1
>>>>>>                      table:
>>>>>>                          input format:
>>>>>> org.apache.hadoop.mapred.TextInputFormat
>>>>>>                          output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>                          serde:
>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>                          name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>  Stage: Stage-5
>>>>>>    Conditional Operator
>>>>>>
>>>>>>  Stage: Stage-4
>>>>>>    Move Operator
>>>>>>      files:
>>>>>>          hdfs directory: true
>>>>>>          destination:
>>>>>>
>>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>>>>
>>>>>>  Stage: Stage-0
>>>>>>    Move Operator
>>>>>>      tables:
>>>>>>          partition:
>>>>>>            ds
>>>>>>          replace: true
>>>>>>          table:
>>>>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>              output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>              name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>  Stage: Stage-2
>>>>>>    Stats-Aggr Operator
>>>>>>
>>>>>>  Stage: Stage-3
>>>>>>    Map Reduce
>>>>>>      Alias -> Map Operator Tree:
>>>>>>
>>>>>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>>>>            File Output Operator
>>>>>>              compressed: true
>>>>>>              GlobalTableId: 0
>>>>>>              table:
>>>>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>                  output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>                  name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nzhang@fb.com>
wrote:
>>>>>>> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622
need
>>>>>>> to be there for merging to take place. HIVE-1307 was committed
to trunk on
>>>>>>> 08/25 and HIVE-1622 was committed on 09/13. The simplest way
is to update
>>>>>>> your Hive trunk and rerun the query. If it still doesn't work
maybe you can
>>>>>>> post your query and the result of 'explain <query>' and
we can take a look.
>>>>>>>
>>>>>>> Ning
>>>>>>>
>>>>>>> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>>>>>>
>>>>>>>> Hi Ning,
>>>>>>>> For the dataset I'm experimenting with, the total size of
the output
>>>>>>>> is 2mb, and the files are at most a few kb in size.  My
>>>>>>>> hive.input.format was set to default HiveInputFormat; however,
when I
>>>>>>>> set it to CombineHiveInputFormat, it only made the first
stage of the
>>>>>>>> job use fewer mappers.  The merge job was *still* filtered
out at
>>>>>>>> runtime.  I also tried set hive.mergejob.maponly=false;
that didn't
>>>>>>>> have any effect.
>>>>>>>>
>>>>>>>> I am a bit at a loss what to do here.  Is there a way to
see what's
>>>>>>>> going on exactly using e.g. debug log levels?..  Btw, I'm
also using
>>>>>>>> dynamic partitions; could that somehow be interfering with
the merge
>>>>>>>> job?..
>>>>>>>>
>>>>>>>> I'm running a relatively fresh Hive from trunk (built maybe
a month
>>>>>>>> ago).
>>>>>>>>
>>>>>>>> --Leo
>>>>>>>>
>>>>>>>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nzhang@fb.com>
wrote:
>>>>>>>>> The settings looks good. The parameter
>>>>>>>>> hive.merge.size.smallfiles.avgsize is used to determine
at run time if a
>>>>>>>>> merge should be triggered: if the average size of the
files in the partition
>>>>>>>>> is SMALLER than the parameter and there are more than
1 file, the merge
>>>>>>>>> should be scheduled. Can you try to see if you have any
big files as well in
>>>>>>>>> your resulting partition? If it is because of a very
large file, you can set
>>>>>>>>> the parameter large enough.
>>>>>>>>>
>>>>>>>>> Another possibility is that your Hadoop installation
does not support
>>>>>>>>> CombineHiveInputFormat, which is used for the new merge
job. Someone
>>>>>>>>> reported previously merge was not successful because
of this. If that's the
>>>>>>>>> case, you can turn off CombineHiveInputFormat and use
the old
>>>>>>>>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>>>>>>>>
>>>>>>>>> Ning
>>>>>>>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>>>>>>>
>>>>>>>>>> I have jobs that sample (or generate) a small amount
of data from a
>>>>>>>>>> large table.  At the end, I get e.g. about 3000
or more files of 1kb
>>>>>>>>>> or so.  This becomes a nuisance.  How can I make
Hive do another pass
>>>>>>>>>> to merge the output?  I have the following settings:
>>>>>>>>>>
>>>>>>>>>> hive.merge.mapfiles=true
>>>>>>>>>> hive.merge.mapredfiles=true
>>>>>>>>>> hive.merge.size.per.task=256000000
>>>>>>>>>> hive.merge.size.smallfiles.avgsize=16000000
>>>>>>>>>>
>>>>>>>>>> After setting hive.merge* to true, Hive started indicating
"Total
>>>>>>>>>> MapReduce jobs = 2".  However, after generating
the
>>>>>>>>>> lots-of-small-files table, Hive says:
>>>>>>>>>> Ended Job = job_201011021934_1344
>>>>>>>>>> Ended Job = 781771542, job is filtered out (removed
at runtime).
>>>>>>>>>>
>>>>>>>>>> Is there a way to force the merge, or am I missing
something?
>>>>>>>>>> --Leo
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Dave Brondsema
>>>>> Software Engineer
>>>>> Geeknet
>>>>>
>>>>> www.geek.net
>>>>>
>>>>
>>>
>
>

Mime
View raw message