accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russ Weeks <rwe...@newbrightidea.com>
Subject Re: Combiner behaviour
Date Tue, 25 Mar 2014 16:57:28 GMT
Wow, that's awesome, thanks very much Josh!

-Russ


On Mon, Mar 24, 2014 at 3:41 PM, Josh Elser <josh.elser@gmail.com> wrote:

> Russ,
>
> Check out https://github.com/joshelser/accumulo-column-summing
>
> Using the SummingCombiner with a call to ScannerBase#fetchColumn(Text,Text)
> will be a pretty decent solution for modest data sets. The (better
> articulated than previously) reason why the SummingCombiner is sub-par is
> that it only sums within a single row and not across rows. This is the
> reason why making a custom iterator to sum across rows is desirable.
>
> Some results you can try running this microbenchmark from the test class
> in the above repository. It creates a table with 1M rows, 7 columns per
> row, and sums over a single column. We can lower the split threshold on our
> table to split it out into more Tablets which should give more realistic
> performance (pay the penalty for the RPC calls that you would at "scale").
> The reduction in number of keys returned (and thus the amount of data over
> the wire) should be the primary reason this approach is desirable.
>
> Hope this makes things clearer!
>
> Number of splits for table: 65
> Number of results to sum: 66
> Time for iterator: 4482 ms
> Number of results to sum: 1000000
> Time for combiner: 4314 ms
>
> Number of results to sum: 66
> Time for iterator: 3651 ms
> Number of results to sum: 1000000
> Time for combiner: 3754 ms
>
> Number of results to sum: 66
> Time for iterator: 3685 ms
> Number of results to sum: 1000000
> Time for combiner: 3839 ms
>
> Number of results to sum: 66
> Time for iterator: 3643 ms
> Number of results to sum: 1000000
> Time for combiner: 4066 ms
>
> Number of results to sum: 66
> Time for iterator: 3880 ms
> Number of results to sum: 1000000
> Time for combiner: 4084 ms
>
>
> On 3/20/14, 9:49 PM, Josh Elser wrote:
>
>> Russ,
>>
>> Close to it. I'll try to work up some actual code to what I'm suggesting.
>>
>> On 3/20/14, 1:12 AM, Russ Weeks wrote:
>>
>>> Hi, Josh,
>>>
>>> Thanks for walking me through this.  This is my first stab at it:
>>>
>>> public class RowSummingCombiner extends WrappingIterator {
>>>
>>> Key lastKey;
>>> long sum;
>>>
>>> public Key getTopKey() {
>>>
>>> if (lastKey == null)
>>>
>>> return super.getTopKey();
>>>
>>> return lastKey;
>>> }
>>> public Value getTopValue() {
>>>
>>> lastKey = null;
>>>
>>> return new Value(Long.toString(sum).getBytes());
>>>
>>> }
>>> public boolean hasTop() {
>>>
>>> return lastKey != null || super.hasTop();
>>>
>>> }
>>> public void next() throws IOException {
>>>
>>> while (super.hasTop()) {
>>>
>>> lastKey = super.getTopKey();
>>>
>>> if (!lastKey.isDeleted()) {
>>>
>>> sum += Long.parseLong(super.getTopValue().toString());
>>>
>>> }
>>> super.next();
>>>
>>> }
>>> }
>>> public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment
>>> env) {
>>>
>>> RowSummingCombiner instance = new RowSummingCombiner();
>>>
>>> instance.setSource(getSource().deepCopy(env));
>>>
>>> return instance;
>>> }
>>> }
>>>
>>> I restrict the scanner to the single CF/CQ that I'm interested in
>>> summing. The biggest disadvantage is that I can't utilize any of the
>>> logic in the Combiner class hierarchy for value decoding etc. because
>>> the logic to "combine" based on the common (row, cf, cq, vis) tuple is
>>> baked in at the top level of that hierarchy and I don't see an easy way
>>> to plug in new behaviour. But, each instance of the RowSummingCombiner
>>> returns its own sum, and then my client just has to add up a handful of
>>> values. Is this what you were getting at?
>>>
>>> Regards,
>>> -Russ
>>>
>>>
>>> On Wed, Mar 19, 2014 at 3:51 PM, Josh Elser <josh.elser@gmail.com
>>> <mailto:josh.elser@gmail.com>> wrote:
>>>
>>>     Ummm, you got the gist of it (I may have misspoke in what I
>>>     initially said).
>>>
>>>     What my first thought was to make an iterator that will filter down
>>>     to the columns that you want. It doesn't look like we have an
>>>     iterator that will efficiently do this for you included in the core
>>>     (although, I know I've done something similar in the past like
>>>     this). This iterator would scan the rows on your table returning
>>>     just the columns you want.
>>>
>>>     000200001ccaac30 meta:size []    1807
>>>     000200001cdaac30 meta:size []    656
>>>     000200001cfaac30 meta:size []    565
>>>
>>>     Then, we could put the summing combiner on top of that iterator to
>>>     sum those and get back a single key. The row in the key you return
>>>     should be the last row you included in the sum. This way, if a retry
>>>     happens under the hood by the batchscanner, you'll resume where you
>>>     left off and won't double-count things.
>>>
>>>     (you could even do things like sum a maximum of N rows before
>>>     returning back some intermediate count to better parallelize things)
>>>
>>>     000200001cfaac30 meta:size []    3028
>>>
>>>     So, each "ScanSession" (what the batchscanner is doing underneath
>>>     the hood) would return you a value which your client would do a
>>>     final summation.
>>>
>>>     The final stack would be {(data from accumulo) > SKVI to project
>>>     columns > summing combiner} > final summation, where {...} denotes
>>>     work done server-side. This is one of those things that really
>>>     shines with the Accumulo API.
>>>
>>>
>>>     On 3/19/14, 6:40 PM, Russ Weeks wrote:
>>>
>>>         Hi, Josh,
>>>
>>>         Thanks very much for your response. I think I get what you're
>>>         saying,
>>>         but it's kind of blowing my mind.
>>>
>>>         Are you saying that if I first set up an iterator that took my
>>>         key/value
>>>         pairs like,
>>>
>>>         000200001ccaac30 meta:size []    1807
>>>         000200001ccaac30 meta:source []    data2
>>>         000200001cdaac30 meta:filename []    doc02985453
>>>         000200001cdaac30 meta:size []    656
>>>         000200001cdaac30 meta:source []    data2
>>>         000200001cfaac30 meta:filename []    doc04484522
>>>         000200001cfaac30 meta:size []    565
>>>         000200001cfaac30 meta:source []    data2
>>>         000200001dcaac30 meta:filename []    doc03342958
>>>
>>>         And emitted something like,
>>>
>>>         0 meta:size [] 1807
>>>         0 meta:size [] 656
>>>         0 meta:size [] 565
>>>
>>>         And then applied a SummingCombiner at a lower priority than that
>>>         iterator, then... it should work, right?
>>>
>>>         I'll give it a try.
>>>
>>>         Regards,
>>>         -Russ
>>>
>>>
>>>         On Wed, Mar 19, 2014 at 3:33 PM, Josh Elser
>>>         <josh.elser@gmail.com <mailto:josh.elser@gmail.com>
>>>         <mailto:josh.elser@gmail.com <mailto:josh.elser@gmail.com>>>
>>> wrote:
>>>
>>>              Russ,
>>>
>>>              Remember about the distribution of data across multiple
>>>         nodes in
>>>              your cluster by tablet.
>>>
>>>              A tablet, at the very minimum, will contain one row. Any
>>>         way to say
>>>              that same thing is that a row will never be split across
>>>         multiple
>>>              tablets. The only guarantee you get from Accumulo here is
>>>         that you
>>>              can use a combiner to do you combination across one row.
>>>
>>>              However, when you combine (pun not intended) another SKVI
>>>         with the
>>>              Combiner, you can do more merging of that intermediate
>>>         "combined
>>>              value" from each row before returning back to the client.
>>>         You can
>>>              think of this approach as doing a multi-level summation.
>>>
>>>              This still requires one final sum on the client side, but
>>>         you should
>>>              get quite the reduction with this approach over doing the
>>>         entire sum
>>>              client side. You sum the meta:size column in parallel
>>>         across parts
>>>              of the table (server-side) and then client-side you sum the
>>>         sums
>>>              from each part.
>>>
>>>              I can sketch this out in more detail if it's not clear. HTH
>>>
>>>
>>>              On 3/19/14, 6:18 PM, Russ Weeks wrote:
>>>
>>>                  The accumulo manual states that combiners can be
>>> applied to
>>>                  values which
>>>                  share the same rowID, column family, and column
>>>         qualifier. Is
>>>                  there any
>>>                  way to adjust this behaviour? I have rows that look
>>> like,
>>>
>>>                  000200001ccaac30 meta:size []    1807
>>>                  000200001ccaac30 meta:source []    data2
>>>                  000200001cdaac30 meta:filename []    doc02985453
>>>                  000200001cdaac30 meta:size []    656
>>>                  000200001cdaac30 meta:source []    data2
>>>                  000200001cfaac30 meta:filename []    doc04484522
>>>                  000200001cfaac30 meta:size []    565
>>>                  000200001cfaac30 meta:source []    data2
>>>                  000200001dcaac30 meta:filename []    doc03342958
>>>
>>>                  and I'd like to sum up all the values of meta:size
>>>         across all
>>>                  rows.  I
>>>                  know I can scan the sizes and sum them on the client
>>>         side, but I was
>>>                  hoping there would be a way to do this inside my
>>>         cluster. Is
>>>                  mapreduce
>>>                  my only option here?
>>>
>>>                  Thanks,
>>>                  -Russ
>>>
>>>
>>>
>>>

Mime
View raw message