hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Baranau <alex.barano...@gmail.com>
Subject Re: [ANN]: HBaseWD: Distribute Sequential Writes in HBase
Date Thu, 21 Apr 2011 15:32:51 GMT
https://issues.apache.org/jira/browse/HBASE-3811

Alex Baranau
----
Sematext :: http://sematext.com/ :: Solr - Lucene - Nutch - Hadoop - HBase

On Thu, Apr 21, 2011 at 5:57 PM, Ted Yu <yuzhihong@gmail.com> wrote:

> My plan was to make regions that have active scanners more stable - trying
> not to move them when balancing.
> I prefer second approach - adding custom attribute(s) to Scan so that the
> Scans created by the method below can be 'grouped'.
>
> If you can file a JIRA, that would be great.
>
> On Thu, Apr 21, 2011 at 7:23 AM, Alex Baranau <alex.baranov.v@gmail.com
> >wrote:
>
> > Aha, so you want to "count" it as single scan (or just differently) when
> > determining the load?
> >
> > The current code looks like this:
> >
> > class DistributedScanner:
> >  public static DistributedScanner create(HTable hTable, Scan original,
> > AbstractRowKeyDistributor keyDistributor) throws IOException {
> >    byte[][] startKeys =
> > keyDistributor.getAllDistributedKeys(original.getStartRow());
> >    byte[][] stopKeys =
> > keyDistributor.getAllDistributedKeys(original.getStopRow());
> >    Scan[] scans = new Scan[startKeys.length];
> >    for (byte i = 0; i < startKeys.length; i++) {
> >      scans[i] = new Scan(original);
> >      scans[i].setStartRow(startKeys[i]);
> >      scans[i].setStopRow(stopKeys[i]);
> >    }
> >
> >    ResultScanner[] rss = new ResultScanner[startKeys.length];
> >    for (byte i = 0; i < scans.length; i++) {
> >      rss[i] = hTable.getScanner(scans[i]);
> >    }
> >
> >    return new DistributedScanner(rss);
> >  }
> >
> > This is client code. To make these scans "identifiable" we need to either
> > use some different (derived from Scan) class or add some attribute to
> them.
> > There's no API for doing the latter. But we can do the former, but I
> don't
> > really like the idea of creating extra class (with no extra
> functionality)
> > just to distinguish it from the base one.
> >
> > If you can share why/how do you want to treat them differently on server
> > side, that would be helpful.
> >
> > Alex Baranau
> > ----
> > Sematext :: http://sematext.com/ :: Solr - Lucene - Nutch - Hadoop -
> HBase
> >
> > On Thu, Apr 21, 2011 at 4:58 PM, Ted Yu <yuzhihong@gmail.com> wrote:
> >
> > > My request would be to make the distributed scan identifiable from
> server
> > > side.
> > > :-)
> > >
> > > On Thu, Apr 21, 2011 at 5:45 AM, Alex Baranau <
> alex.baranov.v@gmail.com
> > > >wrote:
> > >
> > > > > Basically bucketsCount may not equal number of regions for the
> > > underlying
> > > > > table.
> > > >
> > > > True: e.g. when there's only one region that holds data for the whole
> > > table
> > > > (not many records in table yet), distributed scan will fire N scans
> > > against
> > > > the same region.
> > > > On the other hand, in case there are huge number of regions for
> single
> > > > table, each scan can span over multiple regions.
> > > >
> > > > > I need to deal with normal scan and "distributed scan" at server
> > side.
> > > >
> > > > With current implementation "distributed" scan won't be recognized as
> > > > something special on the server side. It will be an ordinary scan.
> > Though
> > > > the number of scan will increase, given that the typical situation is
> > > "many
> > > > regions for single table", the scans of the same "distributed scan"
> are
> > > > likely not to hit the same region.
> > > >
> > > > Not sure if I answered your questions here. Feel free to ask more ;)
> > > >
> > > > Alex Baranau
> > > > ----
> > > > Sematext :: http://sematext.com/ :: Solr - Lucene - Nutch - Hadoop -
> > > HBase
> > > >
> > > > On Wed, Apr 20, 2011 at 2:10 PM, Ted Yu <yuzhihong@gmail.com> wrote:
> > > >
> > > > > Alex:
> > > > > If you read this, you would know why I asked:
> > > > > https://issues.apache.org/jira/browse/HBASE-3679
> > > > >
> > > > > I need to deal with normal scan and "distributed scan" at server
> > side.
> > > > > Basically bucketsCount may not equal number of regions for the
> > > underlying
> > > > > table.
> > > > >
> > > > > Cheers
> > > > >
> > > > > On Tue, Apr 19, 2011 at 11:11 PM, Alex Baranau <
> > > alex.baranov.v@gmail.com
> > > > > >wrote:
> > > > >
> > > > > > Hi Ted,
> > > > > >
> > > > > > We currently use this tool in the scenario where data is consumed
> > by
> > > > > > MapReduce jobs, so we haven't tested the performance of pure
> > > > "distributed
> > > > > > scan" (i.e. N scans instead of 1) a lot. I expect it to be close
> to
> > > > > simple
> > > > > > scan performance, or may be sometimes even faster depending
on
> your
> > > > data
> > > > > > access patterns. E.g. in case you write timeseries data
> > (sequential)
> > > > > which
> > > > > > is written into the single region at a time, then e.g. if you
> > access
> > > > > delta
> > > > > > for further processing/analysis (esp. if from not single client)
> > > these
> > > > > > scans
> > > > > > are likely to hit the same region or couple of regions at a
time,
> > > which
> > > > > may
> > > > > > perform worse comparing to many scans hitting data that is much
> > > better
> > > > > > spread over region servers.
> > > > > >
> > > > > > As for map-reduce job the approach should not affect reading
> > > > performance
> > > > > at
> > > > > > all: it's just that there are bucketsCount times more splits
and
> > > hence
> > > > > > bucketsCount times more Map tasks. In many cases this even
> improves
> > > > > overall
> > > > > > performance of the MR job since work is better distributed over
> > > cluster
> > > > > > (esp. in situation when the aim is to constantly process the
> coming
> > > > delta
> > > > > > which usually resides in one or just couple of regions depending
> on
> > > > > > processing frequency).
> > > > > >
> > > > > > If you can share details on your case, that will help to
> understand
> > > > what
> > > > > > effect(s) to expect from using this approach.
> > > > > >
> > > > > > Alex Baranau
> > > > > > ----
> > > > > > Sematext :: http://sematext.com/ :: Solr - Lucene - Nutch -
> Hadoop
> > -
> > > > > HBase
> > > > > >
> > > > > > On Wed, Apr 20, 2011 at 8:17 AM, Ted Yu <yuzhihong@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Interesting project, Alex.
> > > > > > > Since there're bucketsCount scanners compared to one scanner
> > > > > originally,
> > > > > > > have you performed load testing to see the impact ?
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Tue, Apr 19, 2011 at 10:25 AM, Alex Baranau <
> > > > > alex.baranov.v@gmail.com
> > > > > > > >wrote:
> > > > > > >
> > > > > > > > Hello guys,
> > > > > > > >
> > > > > > > > I'd like to introduce a new small java project/lib
around
> > HBase:
> > > > > > HBaseWD.
> > > > > > > > It
> > > > > > > > is aimed to help with distribution of the load (across
> > > > regionservers)
> > > > > > > when
> > > > > > > > writing sequential (becasue of the row key nature)
records.
> It
> > > > > > implements
> > > > > > > > the solution which was discussed several times on
this
> mailing
> > > list
> > > > > > (e.g.
> > > > > > > > here: http://search-hadoop.com/m/gNRA82No5Wk).
> > > > > > > >
> > > > > > > > Please find the sources at
> > > > > https://github.com/sematext/HBaseWD(there's
> > > > > > > > also
> > > > > > > > a jar of current version for convenience). It is very
easy to
> > > make
> > > > > use
> > > > > > of
> > > > > > > > it: e.g. I added it to one existing project with 1+2
lines of
> > > code
> > > > > (one
> > > > > > > > where I write to HBase and 2 for configuring MapReduce
job).
> > > > > > > >
> > > > > > > > Any feedback is highly appreciated!
> > > > > > > >
> > > > > > > > Please find below the short intro to the lib [1].
> > > > > > > >
> > > > > > > > Alex Baranau
> > > > > > > > ----
> > > > > > > > Sematext :: http://sematext.com/ :: Solr - Lucene
- Nutch -
> > > Hadoop
> > > > -
> > > > > > > HBase
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > > Description:
> > > > > > > > ------------
> > > > > > > > HBaseWD stands for Distributing (sequential) Writes.
It was
> > > > inspired
> > > > > by
> > > > > > > > discussions on HBase mailing lists around the problem
of
> > choosing
> > > > > > > between:
> > > > > > > > * writing records with sequential row keys (e.g. time-series
> > data
> > > > > with
> > > > > > > row
> > > > > > > > key
> > > > > > > >  built based on ts)
> > > > > > > > * using random unique IDs for records
> > > > > > > >
> > > > > > > > First approach makes possible to perform fast range
scans
> with
> > > help
> > > > > of
> > > > > > > > setting
> > > > > > > > start/stop keys on Scanner, but creates single region
server
> > > > > > hot-spotting
> > > > > > > > problem upon writing data (as row keys go in sequence
all
> > records
> > > > end
> > > > > > up
> > > > > > > > written into a single region at a time).
> > > > > > > >
> > > > > > > > Second approach aims for fastest writing performance
by
> > > > distributing
> > > > > > new
> > > > > > > > records over random regions but makes not possible
doing fast
> > > range
> > > > > > scans
> > > > > > > > against written data.
> > > > > > > >
> > > > > > > > The suggested approach stays in the middle of the
two above
> and
> > > > > proved
> > > > > > to
> > > > > > > > perform well by distributing records over the cluster
during
> > > > writing
> > > > > > data
> > > > > > > > while allowing range scans over it. HBaseWD provides
very
> > simple
> > > > API
> > > > > to
> > > > > > > > work with which makes it perfect to use with existing
code.
> > > > > > > >
> > > > > > > > Please refer to unit-tests for lib usage info as they
aimed
> to
> > > act
> > > > as
> > > > > > > > example.
> > > > > > > >
> > > > > > > > Brief Usage Info (Examples):
> > > > > > > > ----------------------------
> > > > > > > >
> > > > > > > > Distributing records with sequential keys which are
being
> > written
> > > > in
> > > > > up
> > > > > > > to
> > > > > > > > Byte.MAX_VALUE buckets:
> > > > > > > >
> > > > > > > >    byte bucketsCount = (byte) 32; // distributing
into 32
> > buckets
> > > > > > > >    RowKeyDistributor keyDistributor =
> > > > > > > >                           new
> > > > > > > > RowKeyDistributorByOneBytePrefix(bucketsCount);
> > > > > > > >    for (int i = 0; i < 100; i++) {
> > > > > > > >      Put put = new
> > > > > Put(keyDistributor.getDistributedKey(originalKey));
> > > > > > > >      ... // add values
> > > > > > > >      hTable.put(put);
> > > > > > > >    }
> > > > > > > >
> > > > > > > >
> > > > > > > > Performing a range scan over written data (internally
> > > > <bucketsCount>
> > > > > > > > scanners
> > > > > > > > executed):
> > > > > > > >
> > > > > > > >    Scan scan = new Scan(startKey, stopKey);
> > > > > > > >    ResultScanner rs = DistributedScanner.create(hTable,
scan,
> > > > > > > > keyDistributor);
> > > > > > > >    for (Result current : rs) {
> > > > > > > >      ...
> > > > > > > >    }
> > > > > > > >
> > > > > > > > Performing mapreduce job over written data chunk specified
by
> > > Scan:
> > > > > > > >
> > > > > > > >    Configuration conf = HBaseConfiguration.create();
> > > > > > > >    Job job = new Job(conf, "testMapreduceJob");
> > > > > > > >
> > > > > > > >    Scan scan = new Scan(startKey, stopKey);
> > > > > > > >
> > > > > > > >    TableMapReduceUtil.initTableMapperJob("table",
scan,
> > > > > > > >      RowCounterMapper.class, ImmutableBytesWritable.class,
> > > > > > Result.class,
> > > > > > > > job);
> > > > > > > >
> > > > > > > >    // Substituting standard TableInputFormat which
was set in
> > > > > > > >    // TableMapReduceUtil.initTableMapperJob(...)
> > > > > > > >    job.setInputFormatClass(WdTableInputFormat.class);
> > > > > > > >    keyDistributor.addInfo(job.getConfiguration());
> > > > > > > >
> > > > > > > >
> > > > > > > > Extending Row Keys Distributing Patterns:
> > > > > > > > -----------------------------------------
> > > > > > > >
> > > > > > > > HBaseWD is designed to be flexible and to support
custom row
> > key
> > > > > > > > distribution
> > > > > > > > approaches. To define custom row key distributing
logic just
> > > > > implement
> > > > > > > > AbstractRowKeyDistributor abstract class which is
really very
> > > > simple:
> > > > > > > >
> > > > > > > >    public abstract class AbstractRowKeyDistributor
implements
> > > > > > > > Parametrizable {
> > > > > > > >      public abstract byte[] getDistributedKey(byte[]
> > > originalKey);
> > > > > > > >      public abstract byte[] getOriginalKey(byte[]
> adjustedKey);
> > > > > > > >      public abstract byte[][] getAllDistributedKeys(byte[]
> > > > > > originalKey);
> > > > > > > >      ... // some utility methods
> > > > > > > >    }
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message