accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Cunningham <ryanc1...@gmail.com>
Subject Accumulo Limiting Iterator Help
Date Fri, 12 Aug 2016 22:02:03 GMT
Hello,



I'm trying to write an iterator that gets the top N sorted entries for a
given range over sharded data. I created a custom iterator that extends
SkippingIterator and made it so that it will return the first N entries for
each tablet. After N entries, I have the source iterator seek to the end
key of the specific range since it shouldn't return any other entries for
that tablet.



@Override

  public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {

    super.init(source, options, env);

    String o = options.get(NUM_SCANS_STRING_NAME);

    numScans = o == null ? 10 : Integer.parseInt(o);

    String n = options.get(NUM_ENTRIES_STRING_NAME);

    numEntriesPerRange = n == null ? Integer.MAX_VALUE :
Integer.parseInt(n);

    numEntries = 0;

  }



  // this is only ever called immediately after getting "next" entry

  @Override

  protected void consume() throws IOException {

    if (numEntries < numEntriesPerRange) {

               ++numEntries;

               return;

    }

    int count = 0;

    while (getSource().hasTop()) {

               if (count < numScans) {

                              ++count;

            getSource().next(); // scan

        } else {

            // too many scans, just seek to end of range

               Key lastKey = latestRange.getEndKey() == null ? new Key(new
Text(String.valueOf(Character.MAX_VALUE))) :
latestRange.getEndKey().followingKey(PartialKey.ROW);

                   getSource().seek(new Range(lastKey, true, lastKey,
false), latestColumnFamilies, latestInclusive);

        }

    }

  }



  @Override

  public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive) throws IOException {

    // save parameters for future internal seeks

    latestRange = range;

    latestColumnFamilies = columnFamilies;

    latestInclusive = inclusive;



    super.seek(range, columnFamilies, inclusive);



    if (getSource().hasTop()) {

      if (range.beforeStartKey(getSource().getTopKey()))

        consume();

    }

  }



I did some initial testing and it seems to work as expected, bringing back
N * number of tablets results. However, when I increase the limit past a
certain point something seems to be messing up and I get all entries back
instead of the limited count. I also sometimes see this error but I looked
online and I'm not sure if it's related:



16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing output
stream.

java.io.IOException: The stream is closed

        at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:118)

        at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)

        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)

        at java.io.FilterOutputStream.close(FilterOutputStream.java:158)

        at
org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110)

        at
org.apache.thrift.transport.TFramedTransport.close(TFramedTransport.java:89)

        at
org.apache.accumulo.core.client.impl.ThriftTransportPool$CachedTTransport.close(ThriftTransportPool.java:312)

        at
org.apache.accumulo.core.client.impl.ThriftTransportPool.returnTransport(ThriftTransportPool.java:584)

        at
org.apache.accumulo.core.util.ThriftUtil.returnClient(ThriftUtil.java:134)

        at
org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator.doLookup(TabletServerBatchReaderIterator.java:714)

        at
org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator$QueryTask.run(TabletServerBatchReaderIterator.java:376)

        at
org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunnable.java:47)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at
org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunnable.java:47)

        at
org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRunnable.java:34)

        at java.lang.Thread.run(Thread.java:745)



Does anyone have any idea why the iterator would work for lower values of N
but not higher ones? Also, I don’t have a lot of experience with iterators
and am not confident that the seek in consume() is right. What is the best
way to skip the rest of a range in an iterator? Or is this not feasible?



Any help would be greatly appreciated!

Thanks,

Ryan

Mime
View raw message