accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Taylor <jamestay...@apache.org>
Subject Re: SQL layer over Accumulo?
Date Thu, 01 May 2014 06:24:38 GMT
Thanks for the explanations, Josh. This sounds very doable. Few more
comments inline below.

James


On Wed, Apr 30, 2014 at 8:37 AM, Josh Elser <josh.elser@gmail.com> wrote:

>
>
> On 4/30/14, 3:33 AM, James Taylor wrote:
>
>> On Tue, Apr 29, 2014 at 11:57 AM, Josh Elser <josh.elser@gmail.com>
>> wrote:
>>
>>  @Josh - it's less baked in than you'd think on the client where the query
>>>
>>>> parsing, compilation, optimization, and orchestration occurs. The
>>>> client/server interaction is hidden behind the ConnectionQueryServices
>>>> interface, the scanning behind ResultIterator (in
>>>> particular ScanningResultIterator), the DML behind MutationState, and
>>>> KeyValue interaction behind KeyValueBuilder. Yes, though, it would
>>>> require
>>>> some more abstraction, but probably not too bad, though. On the
>>>> server-side, the entry points would all be different and that's where
>>>> I'd
>>>> need your insights for what's possible.
>>>>
>>>>
>>> Definitely. I'm a little concerned about what's expected to be provided
>>> by
>>> the "database" (HBase, Accumulo) as I believe HBase is a little more
>>> flexible in allowing writes internally where Accumulo has thus far said
>>> "you're gonna have a bad time".
>>>
>>
>>
>> Tell me more about what you mean by "allowing writes internally".
>>
>
> Haha, sorry, that was a sufficiently ominous statement with insufficient
> context.
>
> For discussion sake, let's just say HBase coprocessors and Accumulo
> iterators are equivalent, purely in the scope of "running server-side code"
> (in the RegionServer/TabletServer). However, there is a notable difference
> in the pipeline where each of those are implemented.
>
> Coprocessors have built-in hooks that let you get updates on
> PUT/GET/DELETE/etc as well as pre and post each of those operations. In
> other words, they provide hooks at a "high database level".
>
> Iterators tend to be much closer to the data itself, only dealing with
> streams of data (other iterators stacked on one another). Iterators
> implement versioning, visibilities, and can even implement complex
> searches. The downside of this approach is that iterators lack any means to
> safely write data _outside of the sorted Key-Value pairs in the tablet
> currently being processed_. It's possible to make in tablet updates, but
> sorted order within a large tablet might make this difficult as well.
>
> This is why I was thinking percolator would be a better solution, as it's
> meant for handling updates like this server-side. However, I imagine it
> would be possible, in the short-term, to make some separate process between
> Phoenix and Accumulo which handles writes.


Another fallback might be to do global index maintenance on the client.
It'd just be more expensive, especially if you want to handle out-of-order
updates (which are particularly tricky, as you have to get multiple
versions of the rows to work out all the different scenarios here).

A second fallback might be to support only local indexing. Does Accumulo
have the concept of a "custom load balancer" that would allow you to
co-locate two regions from different tables? The local-index features has
kind of driven some feature requests on that front for HBase - mainly
callbacks when a region is split or re-located. The rows of the local index
are prefixed with the region start key to keep them together and identify
them.

>
>
>
>>
>>>
>>>   @Eric - I agree about having txn support (probably through snapshot
>>>
>>>> isolation) by controlling the timestamp, and then layering indexing on
>>>> top
>>>> of that. That's where we're headed. But I wouldn't let that stop the
>>>> effort
>>>> - it would just be layered on top of what's already there. FWIW, there's
>>>> another interesting indexing model that has been termed "local
>>>> indexing"(
>>>> https://github.com/Huawei-Hadoop/hindex) which is being worked on right
>>>> now
>>>> (should be available in either our 4.1 or 4.2 release). In this model,
>>>> the
>>>> table data and index data are co-located on the same region server
>>>> through
>>>> a kind of "buddy" region mechanism. The advantage is that you take no
>>>> hit
>>>> at write time, as you're writing both the index and table data together.
>>>> Not sure how/if this would transfer over to the Accumulo world.
>>>>
>>>>
>>> Interesting. Given that Accumulo doesn't have a fixed column family
>>> schema, this might make index generation even easier (maybe "cleaner" is
>>> the proper word). You could easily co-locate the indices with the data,
>>> given them a proper name.
>>>
>>>
>> With HBase, you can do something similar (though, you're right, you'd need
>> to create the column family upfront or take the hit of creating it
>> dynamically - that's a nice feature that Accumulo has). The reason this
>> doesn't work is that you need a different row key so that the index rows
>> are ordered according to their indexed column values. If you put it in a
>> column family of the data table, they're ordered in the same way as the
>> data table. This makes range scans over index tables very expensive, as
>> the
>> rows would need to be re-ordered.
>>
>>
> Ah, of course. You need the term up front to make it sort properly.
>
>
>
>>> Problem still exists that we don't have a solid way to do this solely
>>> inside of Accumulo ATM. I'd imagine that if someone stepped up to
>>> implement
>>> coprocessors, we'd be taking the route of a separate, standalone process
>>> (as opposed to in-RegionServer). Hypothetically, we could do the same for
>>> Phoenix in the short-term.
>>>
>>> Can you quantify what would be expected by Accumulo to integrate with
>>> Phoenix (maybe list what exactly is done inside of HBase at a high
>>> level?)
>>> so that we could give some more targeted ideas/feelings as to what the
>>> level of work would be inside Accumulo?
>>>
>>
>>
>> There's not a lot of hard/fast requirements. Most of what Phoenix does is
>> to optimize performance by leveraging the capabilities of the server. In
>> terms of hard/fast requirements, these come to mind:
>> - data is returned in row key order from range scans
>> - a scan may set a start key/stop key to do a range scan
>> - a row key may be composed of arbitrary bytes
>> - a client may "pre-split" a table by providing the region boundaries at
>> table create time (we rely on this for salting to prevent hotspotting:
>> http://phoenix.incubator.apache.org/salted.html).
>> - the client has access to the region boundaries of a table (this allows
>> for better parallelization)
>> - the client may issue chunk up a scan into smaller, multiple scans and
>> run
>> them in parallel
>> Some of these may be a bit squishy, as there may be existing machinery
>> already in your client programming model that could be leverage. The
>> client
>> API of HBase, for example, does not provide the ability out of the box to
>> parallelize a scan, so this is something Phoenix had to add on top
>> (through
>> chunking up scans at or within region boundaries).
>>
>
> All of these look fine. The Accumulo BatchScanner does that
> parallelization for you which is really nice (handling tablet migration and
> all that fun stuff transparently).


That's nice that Accumulo has this built-in. Does it allow the client to
specify the split points for the scan in some way?

>
>
>
>  Phoenix manages the metadata of your tables (tables, columns, indexes,
>> views, etc) in an HBase table. DDL statements such as CREATE TABLE, DROP
>> TABLE, ALTER TABLE are atomic, transactional operations b/c we don't want
>> our metadata table to get in a corrupt state. To accomplish this, we rely
>> on:
>> - setting a "split policy" that ensures that the table data for a given
>> "tenant" (we support multi-tenancy:
>> http://phoenix.incubator.apache.org/multi-tenancy.html) stay together in
>> the same region.
>> - putting the data using an API that guarantees that either the entire
>> batch of mutations succeed or fail completely.
>> Again, these are details of our implementation on HBase which do not
>> necessarily need to be implemented in the same way on a different system.
>>
>
> I'd have to look again at how our mutation failures are handled (or
> someone else can chime in). This might be something to keep an eye on
> depending on the distribution of mutations in regards to tables.
>
>
>  Phoenix supports sequences which are atomically incremented values. This
>> is
>> done through a coprocessor currently, due to some limitations with the
>> HBase Increment API, but the idea is the same as an atomic increment.
>>
>
> Conditional Mutations in the about-to-be-released version 1.6.0 will
> provide this.
>
>
>  Phoenix does the following push down:
>> - the WHERE clause gets transformed into three things: a start/stop key of
>> a scan, a skip scan filter to efficiently navigate the key space (see
>> http://phoenix-hbase.blogspot.com/2013/05/demystifying-skip-
>> scan-in-phoenix.html),
>> and a custom filter to rule out a row based on some java code that does
>> expression evaluation.
>> - the GROUP BY clause gets pushed to the server and a coprocessor runs the
>> scan on each region so that the client doesn't have to get back all the
>> raw
>> data. Instead, the client gets back the aggregated data (to conserve
>> network bandwidth and to run the scan where the data lives). The client
>> then does a final merge sort.
>>
>
> I've written an iterator to do a group by previously. Depending on the
> schema this is fine.
>
>
>  - the ORDER BY clause used in combination with the LIMIT clause is a TopN
>> query. We optimize this by each region holding on to the top N values with
>> the client then doing a merge sort with the limit applied.
>>
>
> This is an interesting one. If you remove the possibility of tablets
> splitting out from underneath you and you had a view of the splits, you
> could probably pull it off.
>
>
>  - the ORDER BY clause on it's own gets executed on each region (spooled
>> using memory mapped files) and then the client does a merge sort. This
>> spooling could potentially be done on the client side.
>>
>
> Unless we can do some trickery with the schema, yeah, client side.
>
>
>  - joins are executed as a broadcast hash join. We run one side of the
>> query
>> (with the filters applied), compact the results, and send them to each
>> region server where they are cached while we run the other side of the
>> query. A coprocessor then does a map lookup (equi-joins only are supported
>> currently) to join based on the join key and returns the joined results
>> (i.e. the concatenated values in a single, condensed key value as access
>> from the client is positional post-join).
>>
>
> The join approach would need to be implemented some other way for the
> earlier stated comparison of iterators and coprocessors.


Client-side could be another fallback. The coprocessor approach is really
only a big win in two cases: if you have a join which doesn't have many
matches (as those rows get filtered on the server-side), or for correlated
sub queries or exists queries where you can filter or collapse many rows to
one or none on the server-side rather than return them all to the client.

>
>
>  For our global secondary indexes (local secondary indexes are different as
>> we discussed already), we trap updates to the data table through a
>> coprocessor. For index maintenance you need to know when a change occurs
>> to
>> a data row what the prior value of the row was. The reason is because you
>> need to delete the index row corresponding to the old data row and then
>> insert the index row corresponding to the new value (remember, the index
>> value makes up the row key). By doing this operation through a
>> coprocessor,
>> we know that we can get the prior data row state locally. We still need to
>> issue a Put from one region server to another, but this isn't really an
>> extra hop, as if it was done on the client, the same hop would need to be
>> done (but the old row state would need to be pulled over to the client
>> which is not necessary with the coprocessor based approach). For more on
>> global secondary indexing, see
>> http://phoenix.incubator.apache.org/secondary_indexing.html (there are
>> some
>> good presentations at the end of the page that provide more technical
>> detail).
>>
>
> Right, you want to remove the old index value and update a new index value
> (actually being two unique keys) in the same transaction to ensure a valid
> index. Or, at least ensure that you never remove the old value, and die
> before inserting the new value.
>
> Again, not going to work well in an iterator.
>
>
>  Phoenix also allows "point-in-time" queries where a client may establish a
>> connection at an earlier timestamp. If your table is setup to keep
>> multiple
>> versions of the same row, then you can query "back-in-time" and will see
>> the data as it was at that point. We more or less get this for free with
>> the MVCC model of HBase by specifying a max timestamp on a scan. One
>> slightly tricky bit is we correlate the current DDL of your table based on
>> the same timestamp as with your data. So when you go back-in-time like
>> this, you'll also see the structure of your table as it was at time also.
>>
>
> I don't see this as a problem. As long as we remove the versioning
> iterator from a table (which keeps the most recent version of a key by
> default), it should be pretty easy to implement an iterator which adheres
> to the "max timestamp" semantics.
>
>
>  So we do rely on coprocessors, but the underlying APIs were accessing on
>> the server-side are pretty light.
>>
>>   TLDR? Let's continue in the JIRA?
>>
>>>
>>>>
>>> Mailing list is fine by me for while we get this hashed out :). We can
>>> move to Jira when we start getting into specifics.
>>>
>>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message