hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Rawson <ryano...@gmail.com>
Subject Re: ICV concurrency problem (?)
Date Fri, 11 Jun 2010 23:05:31 GMT
I would tend to agree with Todd here, I have generally used 'close()'
in the Hadoop 0.20 MR API to accomplish such tasks.

-ryan

On Fri, Jun 11, 2010 at 4:02 PM, Todd Lipcon <todd@cloudera.com> wrote:
> On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon <mlaffoon@semanticresearch.com
>> wrote:
>
>> Follow-up to this issue, with a test map job to demonstrate it.
>>
>> I created RandomInputFormat that allows you to configure the number of
>> input splits and the size of each split. The record reader generates a
>> random key (UUID) and a value that is unique to each split. For example,
>> you can set it up to have 10 splits of 100,000 records each. This will
>> produce 1,000,000 records, each with a unique key, and values ranging from
>> 0-99 (each repeated 100,000) times.
>>
>> Then I created a simple map job that accepts the input, writes an HBase
>> row for each record, counts the records, and, at the end of the job in
>> cleanup(), increments count columns (using HTable.ICV) in a special
>> "count" row. For the example given, the count row would have columns
>> c:0,c:1,...,c:9 each with the value 100,000, and an additional column c:n
>> with the total, 1,000,000.
>>
>> I'm running all this on a 16 node cluster. For small jobs it works fine.
>> For larger jobs restricted to a single CPU it works fine. However, if I
>> crank up the number of splits and split size, and let it run on multiple
>> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
>> the individual count column values looked good (120000), but the total was
>> 8640000, instead of 9600000.
>>
>
> Are you sure that cleanup() always runs? I don't know the semantics of
> cleanup in the new API, but the fact that you got such a nice round number
> indicates that several entire processes didn't get counted (not just some
> lost edits due to a race)
>
>
>>
>> Is there some behavior of ICV I'm not groking?
>>
>> I'm in the process of trying to simplify the test but any advice, ideas,
>> thoughts would be appreciated.
>>
>> Thanks,
>> Mark
>>
>> P.S. here is the code for the mapper ...
>>
>>    public static class MyMapper extends Mapper<Text, Text, Text, Text> {
>>        private HTable table;
>>        Map<String, Long> counts = new HashMap<String, Long>();
>>
>>        @Override
>>        protected void setup(Context context)
>>                throws IOException, InterruptedException {
>>            final HBaseConfiguration conf = new
>> HBaseConfiguration(context.getConfiguration());
>>            table = new HTable(conf, TABLENAME);
>>            table.setAutoFlush(false);
>>        }
>>
>>        @Override
>>        protected void cleanup(Context context)
>>                throws IOException, InterruptedException {
>>
>>            long totalCount = 0;
>>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>>                final String countStr = entry.getKey();
>>                final long count = entry.getValue();
>>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>> countStr.getBytes(), count);
>>                totalCount += count;
>>            }
>>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>> COUNT_QUALIFIER, totalCount);
>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>> COUNT_QUALIFIER, totalCount);
>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>> COUNT2_QUALIFIER, totalCount);
>>            table.close();
>>        }
>>
>>        @Override
>>        protected void map(Text key, Text value, Context context)
>>                throws IOException, InterruptedException {
>>
>>            Put put = new Put(key.getBytes());
>>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>>            table.put(put);
>>            table.flushCommits();
>>            final String count = value.toString();
>>            counts.put(count, 1L + (counts.containsKey(count) ?
>> counts.get(count) : 0L));
>>            context.getCounter("Debug", "ICV count").increment(1);
>>            context.write(key, value);
>>        }
>>    }
>>
>
>
>
> --
> Todd Lipcon
> Software Engineer, Cloudera
>

Mime
View raw message