hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Buttler, David" <buttl...@llnl.gov>
Subject RE: How to efficiently join HBase tables?
Date Mon, 06 Jun 2011 20:30:15 GMT
So, you all realize the joins have been talked about in the database community for 40 years?
 There are two main types of joins:
Nested loops
Hash table

Mike, in his various emails seems to be trying to re-imagine how to implement both types of
joins in HBase (which seems like a reasonable goal). I am not exactly sure what Eran is going
for here, but it seems like Eran is glossing over a piece.  If you have two scanners for table
A and B, then table B needs to be rescanned for every unique part of the join condition in
table A.  There are certain ways of improving the efficiency of that: the two most obvious
are pushing the selection criteria down to the scans, and scanning all of the same join values
from table B at the same time (which requires that Table B's key is the join, or a secondary
structure that stores the join values as the primary order).

Dave

-----Original Message-----
From: eran@gigya-inc.com [mailto:eran@gigya-inc.com] On Behalf Of Eran Kutner
Sent: Friday, June 03, 2011 12:24 AM
To: user@hbase.apache.org
Subject: Re: How to efficiently join HBase tables?

Mike, this more or less what I tried to  describe in my initial post, only
you explained it much better.
The problem is that I want to do all of this in one M/R run, not 3 and
without explicit temp tables. If there was only a way to feed both table A
and table B into the M/R job then it could be done.

Let's take your query and assumptions, for example.
So we configure scanner A to return rows where c=xxx and d=yyy
We then configure scanner B to return rows where e=zzz
Now we feed all those rows to the mapper.
For each row the mapper gets it outputs a new key which is "a|b" and the
same value it received, if either one doesn't exist in the row the mapper
doesn't output anything for that row.
The is an implicit "temp table" created at this stage by hadoop.
Now the reducer is run, for every key "a|b" generated by the mapper it would
get one or more value sets, each one representing a row from the original
two tables. For simplicity lets assume we got two rows, one from table A the
other from table B. Now the reducer can combine the two rows and output the
combined row. This will work just the same if there were multiple rows from
each table with the same "a|b" key, in that case the reducer would have to
generate the Cartesian product of all the rows. Outer joins can also be done
this way, in an outer join you only get one row in the reducer for a given
"a|b" key but still generate an output.

-eran



On Fri, Jun 3, 2011 at 00:05, Michael Segel <michael_segel@hotmail.com>wrote:

>
> Not to beat a dead horse, but I thought a bit more about the problem.
> If you want to do this all in HBase using a M/R job...
>
> Lets define the following:
> SELECT *
> FROM A, B
> WHERE A.a = B.a
> AND     A.b = B.b
> AND     A.c = xxx
> AND     A.d = yyy
> AND     B.e = zzz
>
> Is the sample query.
>
> So our join key is "a|b" because we're matching on columns a and b. (The
> pipe is to delimit the columns, assuming the columns themselves don't
> contain pipes...)
>
> Our filters on A are c and d while e is the filter on B.
>
> So we want to do the following:
>
> M/R Map job 1 gets the subset from table A along with a set of unique keys.
> M/R Map job 2 gets the subset from table B along with a set of unique keys.
> M/R Map job 3 takes either set of unique keys as the input list and you
> split it based on the number of parallel mappers you want to use.
>
> You have a couple of options on how you want to proceed.
> In each Mapper.map() your input is a unique key.
> I guess you could create two scanners, one for tempTableA, and one for
> tempTableB.
> It looks like you can get the iterator for each result set, and then for
> each row in temp table A, you iterate through the result set from temp table
> B, writing out the joined set.
>
> The only problem is that your result set file isn't in sort order. So I
> guess you could take the output from this job and reduce it to get it in to
> sort order.
>
> Option B. Using HDFS files for temp 'tables'.
> You can do this... but you would still have to track the unique keys and
> also sort both the keys and the files which will require a reduce job.
>
>
> Now this is just my opinion, but if I use HBase, I don't have to worry
> about using a reducer except to order the final output set.
> So I can save the time it takes to do the reduce step. So I have to ask...
> how much time is spent by HBase in splitting and compacting the temp tables?
> Also can't you pre-split the temp table before you use them?
>
> Or am I still missing something?
>
> Note: In this example, you'd have to write an input format that takes a
> java list object (or something similar) as your input and then you can split
> it to get it to run in parallel.
> Or you could just write this on the client and split the list up and run
> the join in parallel threads on the client node. Or a single thread which
> would mean that it would run and output in sort order.
>
> HTH
>
> -Mike
>
> > Date: Wed, 1 Jun 2011 07:47:30 -0700
> > Subject: Re: How to efficiently join HBase tables?
> > From: jason.rutherglen@gmail.com
> > To: user@hbase.apache.org
> >
> > > you somehow need to flush all in-memory data *and* perform a
> > > major compaction
> >
> > This makes sense.  Without compaction the linear HDFS scan isn't
> > possible.  I suppose one could compact 'offline' in a different Map
> > Reduce job.  However that would have it's own issues.
> >
> > > The files do have a flag if they were made by a major compaction,
> > > so you scan only those and ignore the newer ones - but then you are
> trailing
> >
> > This could be ok in many cases.  The key would be to create a sync'd
> > cut off point enabling a frozen point-in-time 'view' of the data.  I'm
> > not sure how that would be implemented.
> >
> > On Wed, Jun 1, 2011 at 6:54 AM, Lars George <lars.george@gmail.com>
> wrote:
> > > Hi Jason,
> > >
> > > This was discussed in the past, using the HFileInputFormat. The issue
> > > is that you somehow need to flush all in-memory data *and* perform a
> > > major compaction - or else you would need all the logic of the
> > > ColumnTracker in the HFIF. Since that needs to scan all storage files
> > > in parallel to achieve its job, the MR task would not really be able
> > > to use the same approach.
> > >
> > > Running a major compaction creates a lot of churn, so it is
> > > questionable what the outcome is. The files do have a flag if they
> > > were made by a major compaction, so you scan only those and ignore the
> > > newer ones - but then you are trailing, and you still do not handle
> > > delete markers/updates in newer files. No easy feat.
> > >
> > > Lars
> > >
> > > On Wed, Jun 1, 2011 at 2:41 AM, Jason Rutherglen
> > > <jason.rutherglen@gmail.com> wrote:
> > >>> I'd imagine that join operations do not require realtime-ness, and
so
> > >>> faster batch jobs using Hive -> frozen HBase files in HDFS could
be
> > >>> the optimal way to go?
> > >>
> > >> In addition to lessening the load on the perhaps live RegionServer.
> > >> There's no Jira for this, I'm tempted to open one.
> > >>
> > >> On Tue, May 31, 2011 at 5:18 PM, Jason Rutherglen
> > >> <jason.rutherglen@gmail.com> wrote:
> > >>>> The Hive-HBase integration allows you to create Hive tables that
are
> backed
> > >>>> by HBase
> > >>>
> > >>> In addition, HBase can be made to go faster for MapReduce jobs, if
> the
> > >>> HFile's could be used directly in HDFS, rather than proxying through
> > >>> the RegionServer.
> > >>>
> > >>> I'd imagine that join operations do not require realtime-ness, and
so
> > >>> faster batch jobs using Hive -> frozen HBase files in HDFS could
be
> > >>> the optimal way to go?
> > >>>
> > >>> On Tue, May 31, 2011 at 1:41 PM, Patrick Angeles <
> patrick@cloudera.com> wrote:
> > >>>> On Tue, May 31, 2011 at 3:19 PM, Eran Kutner <eran@gigya.com>
> wrote:
> > >>>>
> > >>>>> For my need I don't really need the general case, but even
if I did
> I think
> > >>>>> it can probably be done simpler.
> > >>>>> The main problem is getting the data from both tables into
the same
> MR job,
> > >>>>> without resorting to lookups. So without the theoretical
> > >>>>> MutliTableInputFormat, I could just copy all the data from
both
> tables into
> > >>>>> a temp table, just append the source table name to the row
keys to
> make
> > >>>>> sure
> > >>>>> there are no conflicts. When all the data from both tables
is in
> the same
> > >>>>> temp table, run a MR job. For each row the mapper should emit
a key
> which
> > >>>>> is
> > >>>>> composed of all the values of the join fields in that row (the
> value can be
> > >>>>> emitted as is). This will cause all the rows from both tables,
with
> same
> > >>>>> join field values to arrive at the reducer together. The reducer
> could then
> > >>>>> iterate over them and produce the Cartesian product as needed.
> > >>>>>
> > >>>>> I still don't like having to copy all the data into a temp
table
> just
> > >>>>> because I can't feed two tables into the MR job.
> > >>>>>
> > >>>>
> > >>>> Loading the smaller table in memory is called a map join, versus
a
> > >>>> reduce-side join (a.k.a. common join). One reason to prefer a map
> join is
> > >>>> you avoid the shuffle phase which potentially involves several
trips
> to disk
> > >>>> for the intermediate records due to spills, and also once through
> the
> > >>>> network to get each intermediate KV pair to the right reducer.
With
> a map
> > >>>> join, everything is local, except for the part where you load the
> small
> > >>>> table.
> > >>>>
> > >>>>
> > >>>>>
> > >>>>> As Jason Rutherglen mentioned above, Hive can do joins. I don't
> know if it
> > >>>>> can do them for HBase and it will not suit my needs, but it
would
> be
> > >>>>> interesting to know how is it doing them, if anyone knows.
> > >>>>>
> > >>>>
> > >>>> The Hive-HBase integration allows you to create Hive tables that
are
> backed
> > >>>> by HBase. You can do joins on those tables (and also with standard
> Hive
> > >>>> tables). It might be worth trying out in your case as it lets you
> easily see
> > >>>> the load characteristics and the job runtime without much coding
> investment.
> > >>>>
> > >>>> There are probably some specific optimizations that can be applied
> to your
> > >>>> situation, but it's hard to say without knowing your use-case.
> > >>>>
> > >>>> Regards,
> > >>>>
> > >>>> - Patrick
> > >>>>
> > >>>>
> > >>>>> -eran
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Tue, May 31, 2011 at 22:02, Ted Dunning <tdunning@maprtech.com>
> wrote:
> > >>>>>
> > >>>>> > The Cartesian product often makes an honest-to-god join
not such
> a good
> > >>>>> > idea
> > >>>>> > on large data.  The common alternative is co-group
> > >>>>> > which is basically like doing the hard work of the join,
but
> involves
> > >>>>> > stopping just before emitting the cartesian product. 
This allows
> > >>>>> > you to inject whatever cleverness you need at this point.
> > >>>>> >
> > >>>>> > Common kinds of cleverness include down-sampling of
> problematically large
> > >>>>> > sets of candidates.
> > >>>>> >
> > >>>>> > On Tue, May 31, 2011 at 11:56 AM, Michael Segel
> > >>>>> > <michael_segel@hotmail.com>wrote:
> > >>>>> >
> > >>>>> > > So the underlying problem that the OP was trying
to solve was
> how to
> > >>>>> join
> > >>>>> > > two tables from HBase.
> > >>>>> > > Unfortunately I goofed.
> > >>>>> > > I gave a quick and dirty solution that is a bit incomplete.
> They row
> > >>>>> key
> > >>>>> > in
> > >>>>> > > the temp table has to be unique and I forgot about
the
> Cartesian
> > >>>>> > > product. So my solution wouldn't work in the general
case.
> > >>>>> > >
> > >>>>> >
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
>
>

Mime
View raw message