Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7CEE2200B76 for ; Mon, 15 Aug 2016 17:37:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7B4D3160AA7; Mon, 15 Aug 2016 15:37:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 259F2160A8A for ; Mon, 15 Aug 2016 17:37:00 +0200 (CEST) Received: (qmail 37981 invoked by uid 500); 15 Aug 2016 15:37:00 -0000 Mailing-List: contact user-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@accumulo.apache.org Delivered-To: mailing list user@accumulo.apache.org Received: (qmail 37970 invoked by uid 99); 15 Aug 2016 15:37:00 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Aug 2016 15:37:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id AA6561A7A42 for ; Mon, 15 Aug 2016 15:36:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.43 X-Spam-Level: * X-Spam-Status: No, score=1.43 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, HTML_OBFUSCATE_05_10=0.001, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id RISLhQqc7dQl for ; Mon, 15 Aug 2016 15:36:52 +0000 (UTC) Received: from mail-oi0-f49.google.com (mail-oi0-f49.google.com [209.85.218.49]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id DE59A5FC1D for ; Mon, 15 Aug 2016 15:36:51 +0000 (UTC) Received: by mail-oi0-f49.google.com with SMTP id c15so64028627oig.0 for ; Mon, 15 Aug 2016 08:36:51 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=+Cx/wVffSNYIQdsmCYg/QXj86PwAoxZBNWMmz06klLE=; b=hp9rvVeI9hkZ0knvUJpxKfXNlV2377aMHXf5gvo+DrjZbX2eKg7lcnjwXpVAeGuPxS bUCbsU9kkUuN191dZwpKrFQXOJgfKzw2xtTBl/jEtd/vLbDb5A/CLHAoW8LX6xtaTqgW DUEIensrJ2cmeLGSWf1hqr2AA2f7ifvUJ2J1oD6OO82AAykMnA7JrpK9seTej1hR6dt9 Y5+6M39gAtZ88tt5L+YU1W4axU3AoeEWE+kACFHJjN9P8LMKomoAoHh+UmGYTcMZ1y0Y Ob907zYww5zBKvbRhLE5PoYBfizQLv1FzIVRwY/CsVebPNf1TWeaVhri3DNrhu0sKrAS WiYQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=+Cx/wVffSNYIQdsmCYg/QXj86PwAoxZBNWMmz06klLE=; b=jweRFdStKyqPEIKd8ygPZ5dFyPB7CssKFQH93AxqEpGZXvZrynaNTsti7yZJZcBQBJ 2XPVZmpc5lUCGpEo5++gmpdrYCyRofV0TjhZQ6xAcnjyihoY9ojzCfp9NLKLwawNdnnL tqh4rz5fNONA2GuygLcKeVEDL0tqnBJlzvfM87EcUYY95ZK2/2zIFysAUsGUBcfGlllt C8ths6m7TmvLNJBcCkw/MNC4OQb8ggbV/9atsViq2N/fgTzwmpq66yblkSF5VZHaYZ+Q fmvDJ/Qt1D+c4gG2PSTagAKkfIagHt5pAXMU2qNrtOwosL4kP8L3p2boUrg/RUH1QY+T 8jqw== X-Gm-Message-State: AEkoouudt47IxXY60zm6pbNOQVBQ2J0qE0Gt7Ep3VCol8AQ7Th6iVKjW91Gjib5qPWvu3KqtSyysWV0kCsIU+w== X-Received: by 10.202.104.4 with SMTP id d4mr14321346oic.160.1471275411170; Mon, 15 Aug 2016 08:36:51 -0700 (PDT) MIME-Version: 1.0 Received: by 10.157.37.208 with HTTP; Mon, 15 Aug 2016 08:36:50 -0700 (PDT) In-Reply-To: References: From: Ryan Cunningham Date: Mon, 15 Aug 2016 11:36:50 -0400 Message-ID: Subject: Re: Accumulo Limiting Iterator Help To: user@accumulo.apache.org Content-Type: multipart/alternative; boundary=001a1140e81a3feb83053a1e00bb archived-at: Mon, 15 Aug 2016 15:37:02 -0000 --001a1140e81a3feb83053a1e00bb Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Oh ok, that is much simpler. Still wrapping my head around iterators. Good point about the re-init of the iterator. I think in my case it was that the iterator was filling up the batch before hitting the limit and then creating a new instance starting back at 0. I was able to work around this by increasing the table setting 'table.scan.max.memory', which seemed to increase the batch size, and the iterator worked successfully for the higher limits. It doesn't seem that this iterator will work, especially if there are other conditions that can cause a new iterator instance to be created. I guess I can see why iterator logic that uses the range is discouraged haha. Thanks for the help Dylan! On Fri, Aug 12, 2016 at 8:39 PM, Dylan Hutchison wrote: > Hi Ryan, > > I think you could achieve the behavior you described more simply by > overriding hasTop() and returning `false` once your iterator has seen and > emitted N entries. No need to re-seek the parent iterator to a singleton > range, since that will have the same effect as hasTop() =3D=3D false. Al= so, > you're not guaranteed that the singleton range is valid, if the seek rang= e > has an exclusive end key. > > I couldn't follow the meaning behind the `numEntriesPerRange` and > `numScans` variables, but hopefully the above advice helps. I'm also not > sure about the IOException. > > Keep in mind that Accumulo can take down your iterator and re-create it a= t > any point. When it does so, it re-inits and then re-seeks your iterator = to > a position immediately after the last key returned. If this happens in t= he > middle of your iterator counting N entries then it will start counting > again from 0. See the iterator design section > > in the manual for more info on when this happens. > > Cheers, Dylan > > On Fri, Aug 12, 2016 at 3:02 PM, Ryan Cunningham > wrote: > >> Hello, >> >> >> >> I'm trying to write an iterator that gets the top N sorted entries for a >> given range over sharded data. I created a custom iterator that extends >> SkippingIterator and made it so that it will return the first N entries = for >> each tablet. After N entries, I have the source iterator seek to the end >> key of the specific range since it shouldn't return any other entries fo= r >> that tablet. >> >> >> >> @Override >> >> public void init(SortedKeyValueIterator source, >> Map options, IteratorEnvironment env) throws IOException = { >> >> super.init(source, options, env); >> >> String o =3D options.get(NUM_SCANS_STRING_NAME); >> >> numScans =3D o =3D=3D null ? 10 : Integer.parseInt(o); >> >> String n =3D options.get(NUM_ENTRIES_STRING_NAME); >> >> numEntriesPerRange =3D n =3D=3D null ? Integer.MAX_VALUE : >> Integer.parseInt(n); >> >> numEntries =3D 0; >> >> } >> >> >> >> // this is only ever called immediately after getting "next" entry >> >> @Override >> >> protected void consume() throws IOException { >> >> if (numEntries < numEntriesPerRange) { >> >> ++numEntries; >> >> return; >> >> } >> >> int count =3D 0; >> >> while (getSource().hasTop()) { >> >> if (count < numScans) { >> >> ++count; >> >> getSource().next(); // scan >> >> } else { >> >> // too many scans, just seek to end of range >> >> Key lastKey =3D latestRange.getEndKey() =3D=3D null ? new >> Key(new Text(String.valueOf(Character.MAX_VALUE))) : >> latestRange.getEndKey().followingKey(PartialKey.ROW); >> >> getSource().seek(new Range(lastKey, true, lastKey, >> false), latestColumnFamilies, latestInclusive); >> >> } >> >> } >> >> } >> >> >> >> @Override >> >> public void seek(Range range, Collection columnFamilies, >> boolean inclusive) throws IOException { >> >> // save parameters for future internal seeks >> >> latestRange =3D range; >> >> latestColumnFamilies =3D columnFamilies; >> >> latestInclusive =3D inclusive; >> >> >> >> super.seek(range, columnFamilies, inclusive); >> >> >> >> if (getSource().hasTop()) { >> >> if (range.beforeStartKey(getSource().getTopKey())) >> >> consume(); >> >> } >> >> } >> >> >> >> I did some initial testing and it seems to work as expected, bringing >> back N * number of tablets results. However, when I increase the limit p= ast >> a certain point something seems to be messing up and I get all entries b= ack >> instead of the limited count. I also sometimes see this error but I look= ed >> online and I'm not sure if it's related: >> >> >> >> 16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing outpu= t >> stream. >> >> java.io.IOException: The stream is closed >> >> at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputS >> tream.java:118) >> >> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStrea >> m.java:82) >> >> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java >> :140) >> >> at java.io.FilterOutputStream.close(FilterOutputStream.java:158) >> >> at org.apache.thrift.transport.TIOStreamTransport.close(TIOStre >> amTransport.java:110) >> >> at org.apache.thrift.transport.TFramedTransport.close(TFramedTr >> ansport.java:89) >> >> at org.apache.accumulo.core.client.impl.ThriftTransportPool$Cac >> hedTTransport.close(ThriftTransportPool.java:312) >> >> at org.apache.accumulo.core.client.impl.ThriftTransportPool.ret >> urnTransport(ThriftTransportPool.java:584) >> >> at org.apache.accumulo.core.util.ThriftUtil.returnClient(Thrift >> Util.java:134) >> >> at org.apache.accumulo.core.client.impl.TabletServerBatchReader >> Iterator.doLookup(TabletServerBatchReaderIterator.java:714) >> >> at org.apache.accumulo.core.client.impl.TabletServerBatchReader >> Iterator$QueryTask.run(TabletServerBatchReaderIterator.java:376) >> >> at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace >> Runnable.java:47) >> >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> >> at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace >> Runnable.java:47) >> >> at org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRun >> nable.java:34) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> Does anyone have any idea why the iterator would work for lower values o= f >> N but not higher ones? Also, I don=E2=80=99t have a lot of experience wi= th >> iterators and am not confident that the seek in consume() is right. What= is >> the best way to skip the rest of a range in an iterator? Or is this not >> feasible? >> >> >> >> Any help would be greatly appreciated! >> >> Thanks, >> >> Ryan >> > > --001a1140e81a3feb83053a1e00bb Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Oh ok, that is much simpler. Still wrapping my head around= iterators.

Good point about the re-init of the iterator= . I think in my case it was that the iterator was filling up the batch befo= re hitting the limit and then creating a new instance starting back at 0. I= was able to work around this by increasing the table setting 'table.sc= an.max.memory', which seemed to increase the batch size, and the iterat= or worked successfully for the higher limits. It doesn't seem that this= iterator will work, especially if there are other conditions that can caus= e a new iterator instance to be created. I guess I can see why iterator log= ic that uses the range is discouraged haha.

Thanks= for the help Dylan!

On Fri, Aug 12, 2016 at 8:39 PM, Dylan Hutchison <dhutchis@cs.washington.edu> wrote:
Hi Ryan,

I think you could ac= hieve the behavior you described more simply by overriding hasTop() and ret= urning `false` once your iterator has seen and emitted N entries.=C2=A0 No = need to re-seek the parent iterator to a singleton range, since that will h= ave the same effect as hasTop() =3D=3D false.=C2=A0 Also, you're not gu= aranteed that the singleton range is valid, if the seek range has an exclus= ive end key. =C2=A0

I couldn't follow the mean= ing behind the `numEntriesPerRange` and `numScans` variables, but hopefully= the above advice helps.=C2=A0 I'm also not sure about the IOException.=

Keep in mind that Accumulo can take down your ite= rator and re-create it at any point.=C2=A0 When it does so, it re-inits and= then re-seeks your iterator to a position immediately after the last key r= eturned.=C2=A0 If this happens in the middle of your iterator counting N en= tries then it will start counting again from 0.=C2=A0 See the iterator design section in the manual for more info on when= this happens.

Cheers, Dylan

On Fri, A= ug 12, 2016 at 3:02 PM, Ryan Cunningham <ryanc1934@gmail.com> wrote:

Hello,

=C2= =A0

I'= ;m trying to write an iterator that gets the top N sorted entries for a given range over sharded data. I created a custom iter= ator that extends SkippingIterator and made it so that it will return the first = N entries for each tablet. After N entries, I have the source iterator seek t= o the end key of the specific range since it shouldn't return any other e= ntries for that tablet.

=C2= =A0

@Over= ride

=C2= =A0 public void init(SortedKeyValueIterator<Key,Value> source, Map<String,Str= ing> options, IteratorEnvironment env) throws IOException {

=C2= =A0=C2=A0=C2=A0 super.init(source, options, env);

=C2= =A0=C2=A0=C2=A0 String o =3D options.get(NUM_SCANS_STRING_NAME);

=C2= =A0=C2=A0=C2=A0 numScans =3D o =3D=3D null ? 10 : Integer.parseInt(o);

=C2= =A0=C2=A0=C2=A0 String n =3D options.get(NUM_ENTRIES_STRING_NAME);

=C2= =A0=C2=A0=C2=A0 numEntriesPerRange =3D n =3D=3D null ? Integer.MAX_VALUE : Integer.parseInt= (n);

=C2= =A0=C2=A0=C2=A0 numEntries =3D 0;

=C2= =A0 }

=C2= =A0

=C2= =A0=C2=A0// this is only ever called immediately after getting "next" entry

=C2= =A0 @Override

=C2= =A0 protected void consume() throws IOException {

=C2= =A0=C2=A0=C2=A0 if (numEntries < numEntriesPerRange) {

=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 ++numEntries;<= /span>

=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 return;=

=C2= =A0=C2=A0=C2=A0 }

=C2= =A0=C2=A0=C2=A0 int count =3D 0;

=C2= =A0=C2=A0=C2=A0 while (getSource().hasTop()) {

=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 if (count < numScans) {

=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 ++count;

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 getSource().next(); // scan

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 } else {

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 // too many scans, just seek to end of range

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Key lastKey =3D latestRange.getEndKey(= ) =3D=3D null ? new Key(new Text(String.valueOf(Character.MAX_VALUE))) : latestRange.getEndKey().followingKey(PartialKey.ROW);

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0getSource().seek(new Range(lastKey, true, lastKey, false), latestColumnFamilies, latestInclusive);

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }

=C2= =A0=C2=A0=C2=A0 }

=C2= =A0 }

=C2= =A0

=C2= =A0=C2=A0@Override

=C2= =A0 public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) th= rows IOException {

=C2= =A0=C2=A0=C2=A0 // save parameters for future internal seeks

=C2= =A0=C2=A0=C2=A0 latestRange =3D range;

=C2= =A0=C2=A0=C2=A0 latestColumnFamilies =3D columnFamilies;

=C2= =A0=C2=A0=C2=A0 latestInclusive =3D inclusive;

=C2= =A0

=C2= =A0=C2=A0=C2=A0 super.seek(range, columnFamilies, inclusive);

=C2= =A0

=C2= =A0=C2=A0=C2=A0 if (getSource().hasTop()) {

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 if (range.beforeStartKey(getSource().getTopKey()))

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 consume();

=C2= =A0=C2=A0=C2=A0 }

=C2= =A0 }

=C2= =A0

I did= some initial testing and it seems to work as expected, bringing back N * number of tablets results. However, when I increase the limit past a certain point something seems to = be messing up and I get all entries back instead of the limited count. I also sometimes see this error but I looked online and I'm not sure if it'= ;s related:

=C2= =A0

16/08= /12 20:54:22 WARN transport.TIOStreamTransport: Error closing output stream.

java.= io.IOException: The stream is closed

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.hadoo= p.net.SocketOutputStream.write(SocketOutputStream.java:118)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.jav= a:82)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)=

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.io.FilterOutputStream.close(FilterOutputStream.java:158)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTra= nsport.java:110)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apa= che.thrift.transport.TFramedTransport.close(TFramedTransport.= java:89)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.accumulo.core.client.impl.ThriftTransportPool$CachedTT= ransport.close(ThriftTransportPool.java:312)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.accumulo.core.client.impl.ThriftTransportPool.returnTr= ansport(ThriftTransportPool.java:584)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.accumulo.core.util.ThriftUtil.returnClient(ThriftUtil.= java:134)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.accumulo.core.client.impl.TabletServerBatchReaderItera= tor.doLookup(TabletServerBatchReaderIterator.java:714)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.accumulo.core.client.impl.TabletServerBatchReaderItera= tor$QueryTask.run(TabletServerBatchReaderIterator.java:376)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunna= ble.java:47)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu= tor.java:1142)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec= utor.java:617)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunna= ble.java:47)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRunnable= .java:34)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745)

=C2= =A0

Does = anyone have any idea why the iterator would work for lower values of N but not higher ones? Also, I don= =E2=80=99t have a lot of experience with iterators and am not confident that the seek = in consume() is right. What is the best way to skip the rest of a range in an iterator? Or is this not feasible?

=C2= =A0

Any h= elp would be greatly appreciated!

Thank= s,

Ryan<= /span>



--001a1140e81a3feb83053a1e00bb--