accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Fuchs <afu...@apache.org>
Subject Re: Local Combiners to pre-sum at BatchWriter
Date Mon, 06 Apr 2015 16:05:35 GMT
Dylan,

Here's an interesting history note: Accumulo used to run some types of
iterators (essentially Combiners before Combiners existed) at the time of
writing data to the in-memory map. This was removed because some combiners,
like string appends, can cause O(n^2) performance when run in that scope.
This is definitely an area that could benefit from a little engineering.
ACCUMULO-519, for example, tries to shift some load from the minor and
major compactions up into a new compaction scope.

Here are a few thoughts I have on combining in the BatchWriter:
1. You probably don't want to tie the creation of batches to the
pre-combiner size. If you can fit more in a batch (currently limited by
memory size and latency) then you should do so. One important metric for
efficiency here might be the number of batches being sent per input size,
and just shrinking the number of bytes per batch probably won't get you as
far as you want to go. This speaks to having a combiner upstream of the
BatchWriter's batching and RPC steps.
2. For sparse datasets, common key distributions are heavily skewed, so
something like an LRU cache with commit on eviction might give a great
balance of low memory usage and high combining ratio. See for example
section 3.1 of [1]. This can be accomplished in Java with a LinkedHashMap
by overriding the removeEldestEntry method to commit to at BatchWriter on
eviction.
3. The BatchWriter really only groups things into tablets rather than
performing a complete sort. It prefers to keep mutation ordering unchanged
within those groups rather than sorting by something like row.
4. Combiners typically operate on key/value pairs rather than mutations, so
generally doing this within the BatchWriter involves at least one
deserialize/serialize step, which is likely to cause other performance
problems.

I think I would recommend trying to write a BatchWriter wrapper of some
sort that performs early combining and has an API that is more tailored
towards the performance you want to achieve.

Adam

[1] http://lintool.github.io/MapReduceAlgorithms/MapReduce-book-final.pdf

On Sat, Apr 4, 2015 at 3:58 PM, Dylan Hutchison <dhutchis@mit.edu> wrote:

> I've been thinking about a scenario that seems common among high-ingest
> Accumulo users. Suppose we have a "combiner"-type iterator on a table on
> all scopes.  One technique to increase ingest performance is "pre-summing":
> run the combiner on local entries before they are sent through a
> BatchWriter, in order to reduce the number of entries sent to the tablet
> server.
>
> One way to do pre-summing is to create a Map<Key,Value> of entries to send
> to the server on the local client. This equates to the following client
> code, run for each entry to send to Accumulo:
>
>   Key k = nextKeyToSend();
>   Value v = nextValueToSend();
>   Value vPrev = map.get(k);
>   if (vPrev != null)
>     v = combiner.combine(vPrev, v);
>   map.put(k, v);
>
> Each time our map size exceeds a threshold (don't want to run out of
> memory on the client),
>
>   BatchWriter bw; // setup previously from connector
>   for (Map.Entry<Key,Value> entry : map.entrySet()) {
>     Key k = entry.getKey();
>     Mutation m = new Mutation(k.getRow());
>     m.put(k.getColumnFamily(), k.getColumnQualifier(), entry.getValue());
>     bw.addMutation(m);
>   }
>
> (side note: using one entry change per mutation.  I've never investigated
> whether it would be more efficient to put all the updates to a single row
> [i.e. chaining multiple columns in the same row] in one mutation instead.)
>
> This solution works, but it duplicates the purpose of the BatchWriter and
> adds complexity to the client.  If we have to create a separate "cache"
> collection, track its size and dump to a BatchWriter once it gets too big,
> then it seems like we're reimplementing the behavior of the BatchWriter
> that provides an internal cache of size set by
> BatchWriterConfig.setMaxMemory() (that starts flushing once half the
> maximum memory is used), and we're using two caches (user-created map + the
> BatchWriter) where one should be sufficient.
>
> I'm wondering whether there is a way to pre-sum mutations added to a
> BatchWriter automatically, so that we can add entries to a BatchWriter and
> trust that it will apply a combiner function to them before transmitting to
> the tablet server. Something to the effect of:
>
>   BatchWriter bw; // setup previously from connector
>   Combiner combiner = new SummingCombiner();
>   Map<String, String> combinerOptions = new HashMap<>();
>   combinerOptions.put("all", "true"); // or some other column subset option
>   bw.addCombiner(combiner);
>   // or perhaps more generally/ambitiously: bw.addWriteIterator(combiner);
>
>   // effect: combiner will be applied right before flushing data to server
>   // if the combiner throws an exception, then throw a
> MutationsRejectedException
>
> Is there a better way to accomplish this, without duplicating
> BatchWriter's buffer?  Or would this make a nice addition to the API?  If I
> understand the BatchWriter correctly, it already sorts entries before
> sending to the tablet server, because the tablet server can process them
> more efficiently that way.  If so, the overhead cost seems small to add a
> combining step after the sorting phase and before the network transmit
> phase, especially if it reduces network traffic anyway.
>
> Regards,
> Dylan Hutchison
>
>

Mime
View raw message