accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Cunningham <ryanc1...@gmail.com>
Subject Re: Accumulo Limiting Iterator Help
Date Mon, 15 Aug 2016 15:36:50 GMT
Oh ok, that is much simpler. Still wrapping my head around iterators.

Good point about the re-init of the iterator. I think in my case it was
that the iterator was filling up the batch before hitting the limit and
then creating a new instance starting back at 0. I was able to work around
this by increasing the table setting 'table.scan.max.memory', which seemed
to increase the batch size, and the iterator worked successfully for the
higher limits. It doesn't seem that this iterator will work, especially if
there are other conditions that can cause a new iterator instance to be
created. I guess I can see why iterator logic that uses the range is
discouraged haha.

Thanks for the help Dylan!

On Fri, Aug 12, 2016 at 8:39 PM, Dylan Hutchison <dhutchis@cs.washington.edu
> wrote:

> Hi Ryan,
>
> I think you could achieve the behavior you described more simply by
> overriding hasTop() and returning `false` once your iterator has seen and
> emitted N entries.  No need to re-seek the parent iterator to a singleton
> range, since that will have the same effect as hasTop() == false.  Also,
> you're not guaranteed that the singleton range is valid, if the seek range
> has an exclusive end key.
>
> I couldn't follow the meaning behind the `numEntriesPerRange` and
> `numScans` variables, but hopefully the above advice helps.  I'm also not
> sure about the IOException.
>
> Keep in mind that Accumulo can take down your iterator and re-create it at
> any point.  When it does so, it re-inits and then re-seeks your iterator to
> a position immediately after the last key returned.  If this happens in the
> middle of your iterator counting N entries then it will start counting
> again from 0.  See the iterator design section
> <https://accumulo.apache.org/1.7/accumulo_user_manual#_iterator_design>
> in the manual for more info on when this happens.
>
> Cheers, Dylan
>
> On Fri, Aug 12, 2016 at 3:02 PM, Ryan Cunningham <ryanc1934@gmail.com>
> wrote:
>
>> Hello,
>>
>>
>>
>> I'm trying to write an iterator that gets the top N sorted entries for a
>> given range over sharded data. I created a custom iterator that extends
>> SkippingIterator and made it so that it will return the first N entries for
>> each tablet. After N entries, I have the source iterator seek to the end
>> key of the specific range since it shouldn't return any other entries for
>> that tablet.
>>
>>
>>
>> @Override
>>
>>   public void init(SortedKeyValueIterator<Key,Value> source,
>> Map<String,String> options, IteratorEnvironment env) throws IOException {
>>
>>     super.init(source, options, env);
>>
>>     String o = options.get(NUM_SCANS_STRING_NAME);
>>
>>     numScans = o == null ? 10 : Integer.parseInt(o);
>>
>>     String n = options.get(NUM_ENTRIES_STRING_NAME);
>>
>>     numEntriesPerRange = n == null ? Integer.MAX_VALUE :
>> Integer.parseInt(n);
>>
>>     numEntries = 0;
>>
>>   }
>>
>>
>>
>>   // this is only ever called immediately after getting "next" entry
>>
>>   @Override
>>
>>   protected void consume() throws IOException {
>>
>>     if (numEntries < numEntriesPerRange) {
>>
>>                ++numEntries;
>>
>>                return;
>>
>>     }
>>
>>     int count = 0;
>>
>>     while (getSource().hasTop()) {
>>
>>                if (count < numScans) {
>>
>>                               ++count;
>>
>>             getSource().next(); // scan
>>
>>         } else {
>>
>>             // too many scans, just seek to end of range
>>
>>                Key lastKey = latestRange.getEndKey() == null ? new
>> Key(new Text(String.valueOf(Character.MAX_VALUE))) :
>> latestRange.getEndKey().followingKey(PartialKey.ROW);
>>
>>                    getSource().seek(new Range(lastKey, true, lastKey,
>> false), latestColumnFamilies, latestInclusive);
>>
>>         }
>>
>>     }
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public void seek(Range range, Collection<ByteSequence> columnFamilies,
>> boolean inclusive) throws IOException {
>>
>>     // save parameters for future internal seeks
>>
>>     latestRange = range;
>>
>>     latestColumnFamilies = columnFamilies;
>>
>>     latestInclusive = inclusive;
>>
>>
>>
>>     super.seek(range, columnFamilies, inclusive);
>>
>>
>>
>>     if (getSource().hasTop()) {
>>
>>       if (range.beforeStartKey(getSource().getTopKey()))
>>
>>         consume();
>>
>>     }
>>
>>   }
>>
>>
>>
>> I did some initial testing and it seems to work as expected, bringing
>> back N * number of tablets results. However, when I increase the limit past
>> a certain point something seems to be messing up and I get all entries back
>> instead of the limited count. I also sometimes see this error but I looked
>> online and I'm not sure if it's related:
>>
>>
>>
>> 16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing output
>> stream.
>>
>> java.io.IOException: The stream is closed
>>
>>         at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputS
>> tream.java:118)
>>
>>         at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStrea
>> m.java:82)
>>
>>         at java.io.BufferedOutputStream.flush(BufferedOutputStream.java
>> :140)
>>
>>         at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>>
>>         at org.apache.thrift.transport.TIOStreamTransport.close(TIOStre
>> amTransport.java:110)
>>
>>         at org.apache.thrift.transport.TFramedTransport.close(TFramedTr
>> ansport.java:89)
>>
>>         at org.apache.accumulo.core.client.impl.ThriftTransportPool$Cac
>> hedTTransport.close(ThriftTransportPool.java:312)
>>
>>         at org.apache.accumulo.core.client.impl.ThriftTransportPool.ret
>> urnTransport(ThriftTransportPool.java:584)
>>
>>         at org.apache.accumulo.core.util.ThriftUtil.returnClient(Thrift
>> Util.java:134)
>>
>>         at org.apache.accumulo.core.client.impl.TabletServerBatchReader
>> Iterator.doLookup(TabletServerBatchReaderIterator.java:714)
>>
>>         at org.apache.accumulo.core.client.impl.TabletServerBatchReader
>> Iterator$QueryTask.run(TabletServerBatchReaderIterator.java:376)
>>
>>         at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace
>> Runnable.java:47)
>>
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>
>>         at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace
>> Runnable.java:47)
>>
>>         at org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRun
>> nable.java:34)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Does anyone have any idea why the iterator would work for lower values of
>> N but not higher ones? Also, I don’t have a lot of experience with
>> iterators and am not confident that the seek in consume() is right. What is
>> the best way to skip the rest of a range in an iterator? Or is this not
>> feasible?
>>
>>
>>
>> Any help would be greatly appreciated!
>>
>> Thanks,
>>
>> Ryan
>>
>
>

Mime
View raw message