accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dylan Hutchison <dhutc...@cs.washington.edu>
Subject Re: Accumulo Limiting Iterator Help
Date Sat, 13 Aug 2016 00:39:47 GMT
Hi Ryan,

I think you could achieve the behavior you described more simply by
overriding hasTop() and returning `false` once your iterator has seen and
emitted N entries.  No need to re-seek the parent iterator to a singleton
range, since that will have the same effect as hasTop() == false.  Also,
you're not guaranteed that the singleton range is valid, if the seek range
has an exclusive end key.

I couldn't follow the meaning behind the `numEntriesPerRange` and
`numScans` variables, but hopefully the above advice helps.  I'm also not
sure about the IOException.

Keep in mind that Accumulo can take down your iterator and re-create it at
any point.  When it does so, it re-inits and then re-seeks your iterator to
a position immediately after the last key returned.  If this happens in the
middle of your iterator counting N entries then it will start counting
again from 0.  See the iterator design section
<https://accumulo.apache.org/1.7/accumulo_user_manual#_iterator_design> in
the manual for more info on when this happens.

Cheers, Dylan

On Fri, Aug 12, 2016 at 3:02 PM, Ryan Cunningham <ryanc1934@gmail.com>
wrote:

> Hello,
>
>
>
> I'm trying to write an iterator that gets the top N sorted entries for a
> given range over sharded data. I created a custom iterator that extends
> SkippingIterator and made it so that it will return the first N entries for
> each tablet. After N entries, I have the source iterator seek to the end
> key of the specific range since it shouldn't return any other entries for
> that tablet.
>
>
>
> @Override
>
>   public void init(SortedKeyValueIterator<Key,Value> source,
> Map<String,String> options, IteratorEnvironment env) throws IOException {
>
>     super.init(source, options, env);
>
>     String o = options.get(NUM_SCANS_STRING_NAME);
>
>     numScans = o == null ? 10 : Integer.parseInt(o);
>
>     String n = options.get(NUM_ENTRIES_STRING_NAME);
>
>     numEntriesPerRange = n == null ? Integer.MAX_VALUE :
> Integer.parseInt(n);
>
>     numEntries = 0;
>
>   }
>
>
>
>   // this is only ever called immediately after getting "next" entry
>
>   @Override
>
>   protected void consume() throws IOException {
>
>     if (numEntries < numEntriesPerRange) {
>
>                ++numEntries;
>
>                return;
>
>     }
>
>     int count = 0;
>
>     while (getSource().hasTop()) {
>
>                if (count < numScans) {
>
>                               ++count;
>
>             getSource().next(); // scan
>
>         } else {
>
>             // too many scans, just seek to end of range
>
>                Key lastKey = latestRange.getEndKey() == null ? new Key(new
> Text(String.valueOf(Character.MAX_VALUE))) :
> latestRange.getEndKey().followingKey(PartialKey.ROW);
>
>                    getSource().seek(new Range(lastKey, true, lastKey,
> false), latestColumnFamilies, latestInclusive);
>
>         }
>
>     }
>
>   }
>
>
>
>   @Override
>
>   public void seek(Range range, Collection<ByteSequence> columnFamilies,
> boolean inclusive) throws IOException {
>
>     // save parameters for future internal seeks
>
>     latestRange = range;
>
>     latestColumnFamilies = columnFamilies;
>
>     latestInclusive = inclusive;
>
>
>
>     super.seek(range, columnFamilies, inclusive);
>
>
>
>     if (getSource().hasTop()) {
>
>       if (range.beforeStartKey(getSource().getTopKey()))
>
>         consume();
>
>     }
>
>   }
>
>
>
> I did some initial testing and it seems to work as expected, bringing back
> N * number of tablets results. However, when I increase the limit past a
> certain point something seems to be messing up and I get all entries back
> instead of the limited count. I also sometimes see this error but I looked
> online and I'm not sure if it's related:
>
>
>
> 16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing output
> stream.
>
> java.io.IOException: The stream is closed
>
>         at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputS
> tream.java:118)
>
>         at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStrea
> m.java:82)
>
>         at java.io.BufferedOutputStream.flush(BufferedOutputStream.java
> :140)
>
>         at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>
>         at org.apache.thrift.transport.TIOStreamTransport.close(TIOStre
> amTransport.java:110)
>
>         at org.apache.thrift.transport.TFramedTransport.close(TFramedTr
> ansport.java:89)
>
>         at org.apache.accumulo.core.client.impl.ThriftTransportPool$Cac
> hedTTransport.close(ThriftTransportPool.java:312)
>
>         at org.apache.accumulo.core.client.impl.ThriftTransportPool.ret
> urnTransport(ThriftTransportPool.java:584)
>
>         at org.apache.accumulo.core.util.ThriftUtil.returnClient(Thrift
> Util.java:134)
>
>         at org.apache.accumulo.core.client.impl.TabletServerBatchReader
> Iterator.doLookup(TabletServerBatchReaderIterator.java:714)
>
>         at org.apache.accumulo.core.client.impl.TabletServerBatchReader
> Iterator$QueryTask.run(TabletServerBatchReaderIterator.java:376)
>
>         at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace
> Runnable.java:47)
>
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>
>         at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace
> Runnable.java:47)
>
>         at org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRun
> nable.java:34)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
> Does anyone have any idea why the iterator would work for lower values of
> N but not higher ones? Also, I don’t have a lot of experience with
> iterators and am not confident that the seek in consume() is right. What is
> the best way to skip the rest of a range in an iterator? Or is this not
> feasible?
>
>
>
> Any help would be greatly appreciated!
>
> Thanks,
>
> Ryan
>

Mime
View raw message