accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Elser <>
Subject Re: SQL layer over Accumulo?
Date Wed, 30 Apr 2014 15:37:21 GMT

On 4/30/14, 3:33 AM, James Taylor wrote:
> On Tue, Apr 29, 2014 at 11:57 AM, Josh Elser <> wrote:
>> @Josh - it's less baked in than you'd think on the client where the query
>>> parsing, compilation, optimization, and orchestration occurs. The
>>> client/server interaction is hidden behind the ConnectionQueryServices
>>> interface, the scanning behind ResultIterator (in
>>> particular ScanningResultIterator), the DML behind MutationState, and
>>> KeyValue interaction behind KeyValueBuilder. Yes, though, it would require
>>> some more abstraction, but probably not too bad, though. On the
>>> server-side, the entry points would all be different and that's where I'd
>>> need your insights for what's possible.
>> Definitely. I'm a little concerned about what's expected to be provided by
>> the "database" (HBase, Accumulo) as I believe HBase is a little more
>> flexible in allowing writes internally where Accumulo has thus far said
>> "you're gonna have a bad time".
> Tell me more about what you mean by "allowing writes internally".

Haha, sorry, that was a sufficiently ominous statement with insufficient 

For discussion sake, let's just say HBase coprocessors and Accumulo 
iterators are equivalent, purely in the scope of "running server-side 
code" (in the RegionServer/TabletServer). However, there is a notable 
difference in the pipeline where each of those are implemented.

Coprocessors have built-in hooks that let you get updates on 
PUT/GET/DELETE/etc as well as pre and post each of those operations. In 
other words, they provide hooks at a "high database level".

Iterators tend to be much closer to the data itself, only dealing with 
streams of data (other iterators stacked on one another). Iterators 
implement versioning, visibilities, and can even implement complex 
searches. The downside of this approach is that iterators lack any means 
to safely write data _outside of the sorted Key-Value pairs in the 
tablet currently being processed_. It's possible to make in tablet 
updates, but sorted order within a large tablet might make this 
difficult as well.

This is why I was thinking percolator would be a better solution, as 
it's meant for handling updates like this server-side. However, I 
imagine it would be possible, in the short-term, to make some separate 
process between Phoenix and Accumulo which handles writes.

>>   @Eric - I agree about having txn support (probably through snapshot
>>> isolation) by controlling the timestamp, and then layering indexing on top
>>> of that. That's where we're headed. But I wouldn't let that stop the
>>> effort
>>> - it would just be layered on top of what's already there. FWIW, there's
>>> another interesting indexing model that has been termed "local indexing"(
>>> which is being worked on right
>>> now
>>> (should be available in either our 4.1 or 4.2 release). In this model, the
>>> table data and index data are co-located on the same region server through
>>> a kind of "buddy" region mechanism. The advantage is that you take no hit
>>> at write time, as you're writing both the index and table data together.
>>> Not sure how/if this would transfer over to the Accumulo world.
>> Interesting. Given that Accumulo doesn't have a fixed column family
>> schema, this might make index generation even easier (maybe "cleaner" is
>> the proper word). You could easily co-locate the indices with the data,
>> given them a proper name.
> With HBase, you can do something similar (though, you're right, you'd need
> to create the column family upfront or take the hit of creating it
> dynamically - that's a nice feature that Accumulo has). The reason this
> doesn't work is that you need a different row key so that the index rows
> are ordered according to their indexed column values. If you put it in a
> column family of the data table, they're ordered in the same way as the
> data table. This makes range scans over index tables very expensive, as the
> rows would need to be re-ordered.

Ah, of course. You need the term up front to make it sort properly.

>> Problem still exists that we don't have a solid way to do this solely
>> inside of Accumulo ATM. I'd imagine that if someone stepped up to implement
>> coprocessors, we'd be taking the route of a separate, standalone process
>> (as opposed to in-RegionServer). Hypothetically, we could do the same for
>> Phoenix in the short-term.
>> Can you quantify what would be expected by Accumulo to integrate with
>> Phoenix (maybe list what exactly is done inside of HBase at a high level?)
>> so that we could give some more targeted ideas/feelings as to what the
>> level of work would be inside Accumulo?
> There's not a lot of hard/fast requirements. Most of what Phoenix does is
> to optimize performance by leveraging the capabilities of the server. In
> terms of hard/fast requirements, these come to mind:
> - data is returned in row key order from range scans
> - a scan may set a start key/stop key to do a range scan
> - a row key may be composed of arbitrary bytes
> - a client may "pre-split" a table by providing the region boundaries at
> table create time (we rely on this for salting to prevent hotspotting:
> - the client has access to the region boundaries of a table (this allows
> for better parallelization)
> - the client may issue chunk up a scan into smaller, multiple scans and run
> them in parallel
> Some of these may be a bit squishy, as there may be existing machinery
> already in your client programming model that could be leverage. The client
> API of HBase, for example, does not provide the ability out of the box to
> parallelize a scan, so this is something Phoenix had to add on top (through
> chunking up scans at or within region boundaries).

All of these look fine. The Accumulo BatchScanner does that 
parallelization for you which is really nice (handling tablet migration 
and all that fun stuff transparently).

> Phoenix manages the metadata of your tables (tables, columns, indexes,
> views, etc) in an HBase table. DDL statements such as CREATE TABLE, DROP
> TABLE, ALTER TABLE are atomic, transactional operations b/c we don't want
> our metadata table to get in a corrupt state. To accomplish this, we rely
> on:
> - setting a "split policy" that ensures that the table data for a given
> "tenant" (we support multi-tenancy:
> stay together in
> the same region.
> - putting the data using an API that guarantees that either the entire
> batch of mutations succeed or fail completely.
> Again, these are details of our implementation on HBase which do not
> necessarily need to be implemented in the same way on a different system.

I'd have to look again at how our mutation failures are handled (or 
someone else can chime in). This might be something to keep an eye on 
depending on the distribution of mutations in regards to tables.

> Phoenix supports sequences which are atomically incremented values. This is
> done through a coprocessor currently, due to some limitations with the
> HBase Increment API, but the idea is the same as an atomic increment.

Conditional Mutations in the about-to-be-released version 1.6.0 will 
provide this.

> Phoenix does the following push down:
> - the WHERE clause gets transformed into three things: a start/stop key of
> a scan, a skip scan filter to efficiently navigate the key space (see
> and a custom filter to rule out a row based on some java code that does
> expression evaluation.
> - the GROUP BY clause gets pushed to the server and a coprocessor runs the
> scan on each region so that the client doesn't have to get back all the raw
> data. Instead, the client gets back the aggregated data (to conserve
> network bandwidth and to run the scan where the data lives). The client
> then does a final merge sort.

I've written an iterator to do a group by previously. Depending on the 
schema this is fine.

> - the ORDER BY clause used in combination with the LIMIT clause is a TopN
> query. We optimize this by each region holding on to the top N values with
> the client then doing a merge sort with the limit applied.

This is an interesting one. If you remove the possibility of tablets 
splitting out from underneath you and you had a view of the splits, you 
could probably pull it off.

> - the ORDER BY clause on it's own gets executed on each region (spooled
> using memory mapped files) and then the client does a merge sort. This
> spooling could potentially be done on the client side.

Unless we can do some trickery with the schema, yeah, client side.

> - joins are executed as a broadcast hash join. We run one side of the query
> (with the filters applied), compact the results, and send them to each
> region server where they are cached while we run the other side of the
> query. A coprocessor then does a map lookup (equi-joins only are supported
> currently) to join based on the join key and returns the joined results
> (i.e. the concatenated values in a single, condensed key value as access
> from the client is positional post-join).

The join approach would need to be implemented some other way for the 
earlier stated comparison of iterators and coprocessors.

> For our global secondary indexes (local secondary indexes are different as
> we discussed already), we trap updates to the data table through a
> coprocessor. For index maintenance you need to know when a change occurs to
> a data row what the prior value of the row was. The reason is because you
> need to delete the index row corresponding to the old data row and then
> insert the index row corresponding to the new value (remember, the index
> value makes up the row key). By doing this operation through a coprocessor,
> we know that we can get the prior data row state locally. We still need to
> issue a Put from one region server to another, but this isn't really an
> extra hop, as if it was done on the client, the same hop would need to be
> done (but the old row state would need to be pulled over to the client
> which is not necessary with the coprocessor based approach). For more on
> global secondary indexing, see
> (there are some
> good presentations at the end of the page that provide more technical
> detail).

Right, you want to remove the old index value and update a new index 
value (actually being two unique keys) in the same transaction to ensure 
a valid index. Or, at least ensure that you never remove the old value, 
and die before inserting the new value.

Again, not going to work well in an iterator.

> Phoenix also allows "point-in-time" queries where a client may establish a
> connection at an earlier timestamp. If your table is setup to keep multiple
> versions of the same row, then you can query "back-in-time" and will see
> the data as it was at that point. We more or less get this for free with
> the MVCC model of HBase by specifying a max timestamp on a scan. One
> slightly tricky bit is we correlate the current DDL of your table based on
> the same timestamp as with your data. So when you go back-in-time like
> this, you'll also see the structure of your table as it was at time also.

I don't see this as a problem. As long as we remove the versioning 
iterator from a table (which keeps the most recent version of a key by 
default), it should be pretty easy to implement an iterator which 
adheres to the "max timestamp" semantics.

> So we do rely on coprocessors, but the underlying APIs were accessing on
> the server-side are pretty light.
>   TLDR? Let's continue in the JIRA?
>> Mailing list is fine by me for while we get this hashed out :). We can
>> move to Jira when we start getting into specifics.

View raw message