Return-Path: X-Original-To: apmail-accumulo-user-archive@www.apache.org Delivered-To: apmail-accumulo-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A6B6517AC4 for ; Sat, 4 Apr 2015 20:41:13 +0000 (UTC) Received: (qmail 84719 invoked by uid 500); 4 Apr 2015 20:41:13 -0000 Delivered-To: apmail-accumulo-user-archive@accumulo.apache.org Received: (qmail 84668 invoked by uid 500); 4 Apr 2015 20:41:13 -0000 Mailing-List: contact user-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@accumulo.apache.org Delivered-To: mailing list user@accumulo.apache.org Received: (qmail 84656 invoked by uid 99); 4 Apr 2015 20:41:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Apr 2015 20:41:13 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of david.medinets@gmail.com designates 74.125.82.51 as permitted sender) Received: from [74.125.82.51] (HELO mail-wg0-f51.google.com) (74.125.82.51) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Apr 2015 20:41:07 +0000 Received: by wgin8 with SMTP id n8so1232158wgi.0 for ; Sat, 04 Apr 2015 13:38:30 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=2sYyig7Ahd5UFKy8xSXRNbE26nDwfL+jz5ggAh6J/ZY=; b=TGzEqqUYt++9KnZDZdenTLdRTfGEPpI08iFowcAGjLo1RC9M2UuEOoZY1toirl/gXR zGisnDZw9WWrPQuCIvTpqIv1VZkgVAOCsaBpZ+dEijoQ9YEQgO5j25WLDXevqJpUMhuK Yt7AHa6z6Hny87xwSCc8l7Xb/Gg6zk0x2V1Xw4LIYjgq2DcCbxZd3D3rV88XzoZtXKYP ixYO+j5x9S9glomHUwvmipc86Wgd/POJ1hZGRJ0u8wwuT5uyL/BeuAo1AMaFz7etA2Ob ywzXmqx5Lcw+VGdW/mnCrATJmlTz6M4hWrXvSj+H5SfB/NFwHDW7ZDN+qowRKvVHdm8g V5vA== MIME-Version: 1.0 X-Received: by 10.180.228.104 with SMTP id sh8mr44412340wic.61.1428179910838; Sat, 04 Apr 2015 13:38:30 -0700 (PDT) Received: by 10.194.100.170 with HTTP; Sat, 4 Apr 2015 13:38:30 -0700 (PDT) Received: by 10.194.100.170 with HTTP; Sat, 4 Apr 2015 13:38:30 -0700 (PDT) In-Reply-To: References: Date: Sat, 4 Apr 2015 16:38:30 -0400 Message-ID: Subject: Re: Local Combiners to pre-sum at BatchWriter From: David Medinets To: accumulo-user Content-Type: multipart/alternative; boundary=001a1134cfd842d0a10512ec0ccf X-Virus-Checked: Checked by ClamAV on apache.org --001a1134cfd842d0a10512ec0ccf Content-Type: text/plain; charset=UTF-8 Aren't you essentially adding another of accumulo node? On Apr 4, 2015 3:59 PM, "Dylan Hutchison" wrote: > I've been thinking about a scenario that seems common among high-ingest > Accumulo users. Suppose we have a "combiner"-type iterator on a table on > all scopes. One technique to increase ingest performance is "pre-summing": > run the combiner on local entries before they are sent through a > BatchWriter, in order to reduce the number of entries sent to the tablet > server. > > One way to do pre-summing is to create a Map of entries to send > to the server on the local client. This equates to the following client > code, run for each entry to send to Accumulo: > > Key k = nextKeyToSend(); > Value v = nextValueToSend(); > Value vPrev = map.get(k); > if (vPrev != null) > v = combiner.combine(vPrev, v); > map.put(k, v); > > Each time our map size exceeds a threshold (don't want to run out of > memory on the client), > > BatchWriter bw; // setup previously from connector > for (Map.Entry entry : map.entrySet()) { > Key k = entry.getKey(); > Mutation m = new Mutation(k.getRow()); > m.put(k.getColumnFamily(), k.getColumnQualifier(), entry.getValue()); > bw.addMutation(m); > } > > (side note: using one entry change per mutation. I've never investigated > whether it would be more efficient to put all the updates to a single row > [i.e. chaining multiple columns in the same row] in one mutation instead.) > > This solution works, but it duplicates the purpose of the BatchWriter and > adds complexity to the client. If we have to create a separate "cache" > collection, track its size and dump to a BatchWriter once it gets too big, > then it seems like we're reimplementing the behavior of the BatchWriter > that provides an internal cache of size set by > BatchWriterConfig.setMaxMemory() (that starts flushing once half the > maximum memory is used), and we're using two caches (user-created map + the > BatchWriter) where one should be sufficient. > > I'm wondering whether there is a way to pre-sum mutations added to a > BatchWriter automatically, so that we can add entries to a BatchWriter and > trust that it will apply a combiner function to them before transmitting to > the tablet server. Something to the effect of: > > BatchWriter bw; // setup previously from connector > Combiner combiner = new SummingCombiner(); > Map combinerOptions = new HashMap<>(); > combinerOptions.put("all", "true"); // or some other column subset option > bw.addCombiner(combiner); > // or perhaps more generally/ambitiously: bw.addWriteIterator(combiner); > > // effect: combiner will be applied right before flushing data to server > // if the combiner throws an exception, then throw a > MutationsRejectedException > > Is there a better way to accomplish this, without duplicating > BatchWriter's buffer? Or would this make a nice addition to the API? If I > understand the BatchWriter correctly, it already sorts entries before > sending to the tablet server, because the tablet server can process them > more efficiently that way. If so, the overhead cost seems small to add a > combining step after the sorting phase and before the network transmit > phase, especially if it reduces network traffic anyway. > > Regards, > Dylan Hutchison > > --001a1134cfd842d0a10512ec0ccf Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Aren't you=C2=A0 essentially adding another of accumulo = node?

On Apr 4, 2015 3:59 PM, "Dylan Hutchison&qu= ot; <dhutchis@mit.edu> wrote:=
I've been thinking about a scenario that seems common among high-ing= est Accumulo users. Suppose we have a "combiner"-type iterator on= a table on all scopes.=C2=A0 One technique to increase ingest performance = is "pre-summing": run the combiner on local entries before they a= re sent through a BatchWriter, in order to reduce the number of entries sen= t to the tablet server.

One way to do pre-summing = is to create a Map<Key,Value> of entries to send to the server on the= local client. This equates to the following client code, run for each entr= y to send to Accumulo:

=C2=A0 Key k =3D nextKeyToS= end();
=C2=A0 Value v =3D nextValueToSend();
=C2=A0 Val= ue vPrev =3D map.get(k);
=C2=A0 if (vPrev !=3D null)
= =C2=A0 =C2=A0 v =3D combiner.combine(vPrev, v);
=C2=A0 map.put(k,= v);

Each time our map size exceeds a threshold (d= on't want to run out of memory on the client),

=C2=A0 BatchWriter bw; // setup previously from connector
=C2=A0= for (Map.Entry<Key,Value> entry : map.entrySet()) {
=C2=A0= =C2=A0 Key k =3D entry.getKey();
=C2=A0 =C2=A0 Mutation m =3D ne= w Mutation(k.getRow());
=C2=A0 =C2=A0 m.put(k.getColumnFamily(), = k.getColumnQualifier(), entry.getValue());
=C2=A0 =C2=A0 bw.addMu= tation(m);
=C2=A0 }

(side note: using on= e entry change per mutation.=C2=A0 I've never investigated whether it w= ould be more efficient to put all the updates to a single row [i.e. chainin= g multiple columns in the same row] in one mutation instead.)
This solution works, but it duplicates the purpose of the Batch= Writer and adds complexity to the client.=C2=A0 If we have to create a sepa= rate "cache" collection, track its size and dump to a BatchWriter= once it gets too big, then it seems like we're reimplementing the beha= vior of the BatchWriter that provides an internal cache of size set by Batc= hWriterConfig.setMaxMemory() (that starts flushing once half the maximum me= mory is used), and we're using two caches (user-created map + the Batch= Writer) where one should be sufficient.

I'm wo= ndering whether there is a way to pre-sum mutations added to a BatchWriter = automatically, so that we can add entries to a BatchWriter and trust that i= t will apply a combiner function to them before transmitting to the tablet = server. Something to the effect of:

=C2=A0 BatchWr= iter bw; // setup previously from connector
=C2=A0 Combiner combi= ner =3D new SummingCombiner();
=C2=A0 Map<String, String> c= ombinerOptions =3D new HashMap<>();
=C2=A0 combinerOptions.= put("all", "true"); // or some other column subset opti= on
=C2=A0 bw.addCombiner(combiner);
=C2=A0 // or perhap= s more generally/ambitiously: bw.addWriteIterator(combiner);
=C2= =A0=C2=A0
=C2=A0 // effect: combiner will be applied right before= flushing data to server
=C2=A0 // if the combiner throws an exce= ption, then throw a MutationsRejectedException

Is = there a better way to accomplish this, without duplicating BatchWriter'= s buffer?=C2=A0 Or would this make a nice addition to the API?=C2=A0 If I u= nderstand the BatchWriter correctly, it already sorts entries before sendin= g to the tablet server, because the tablet server can process them more eff= iciently that way.=C2=A0 If so, the overhead cost seems small to add a comb= ining step after the sorting phase and before the network transmit phase, e= specially if it reduces network traffic anyway.

Re= gards,
Dylan Hutchison

--001a1134cfd842d0a10512ec0ccf--