accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Slater, David M." <>
Subject RE: Improving Batchscanner Performance
Date Wed, 21 May 2014 18:30:39 GMT
Ah, here is some rfile info:

hadoop fs -ls

-rw-r--r--   2 hadoop supergroup   88638093 2014-05-21 12:44 /accumulo/tables/32/t-0014fhi/A0014h25.rf

Locality group         : <DEFAULT>
        Start block          : 0
        Num   blocks         : 2,938
        Index level 1        : 290 bytes  1 blocks
        Index level 0        : 297,686 bytes  3 blocks
        First key            : 14006808|19|000001dd-04f9-4dc4-9d0e-0d7070d6ebaa ipdst:
[] 1400684778699 false
        Last key             : 14006808|19|ffffefd8-9b71-4a92-aa4d-06259dcc1e58 ttl:64 []
1400685445408 false
        Num entries          : 7,906,420
        Column families      : [ttl, ipdst, question, response,PTR, response,NS, timestamp,
ipsrc, response,A, response,TXT, response,CNAME]

Meta block     : BCFile.index
      Raw size             : 4 bytes
      Compressed size      : 12 bytes
      Compression type     : gz

Meta block     : RFile.index
      Raw size             : 620 bytes
      Compressed size      : 414 bytes
      Compression type     : gz

Meta block     : acu_bloom
      Raw size             : 3,080,506 bytes
      Compressed size      : 2,502,618 bytes
      Compression type     : gz


-----Original Message-----
From: Josh Elser []
Sent: Wednesday, May 21, 2014 1:30 PM
Subject: Re: Improving Batchscanner Performance

Inline'd this time

On 5/21/14, 12:58 PM, Slater, David M. wrote:
> You are correct that the "bin" is largely redundant. I created that because I was not
guaranteed that the guid was uniformly random (I have seen some that aren't uniformly distributed),
and I'm not the one who specified it. There is another mechanism I didn't mention, which is
that the bin is prepended by a timeblock (typically an hour span), and my data is streaming.
So essentially, I create a number of splits for the next timeblock for X bins, and then when
the data input moves into that time block it can ingest directly onto empty tablets.


> I don't think rfile-info comes on 1.4, but I looked at the !METADATA table, and if I'm
reading it correctly:

Oops, you're right, I think it was introduced in 1.5, but it's just a wrapper. You can invoke
the PrintInfo class directly:

accumulo org.apache.accumulo.core.file.rfile.PrintInfo '/path/to/rfile.rf'

> 31;14006844|00 file:/t-0014fpy/A0014h4u.rf []    155454467,5450454
> This is a 155 MB file with an index block of 5.45 MB. This is a typical size for a timeblock|bin
> After the data gets over a day old, I do a nightly job to merge the bins for each timeblock
together, resulting in data like:
> 31;14000292|00 file:/t-0011bgk/C0011e06.rf []    1922144744,67390597
> 31;14000292|00 file:/t-0011bgk/C0011ed3.rf []    1942040855,68058489
> This is about 4 GB with 140 MB of index. So it looks like the index size is about 3.5%
of the files, if I'm reading it correctly.

I think you're confused about what those numbers mean. The two numbers in the Value are size
in bytes and number of entries, not data size and index size.

This means that your entries are about 30bytes in size, which seems in line with what you
described given the encoding/compression Accumulo is doing.

You could try playing with table.file.compress.blocksize. IIRC, if you reduce this value from
the default 100k, you would get more blocks per RFile, which means that you get more index
records, which, in turn, would mean you can find your records faster at the cost of a larger

> In total, there about 440 tablets per server, with 4 servers, storing a total of about
2.1 TB of data (each server has a single 1 TB HDD).
> I enabled bloom filters, but I didn't restart Accumulo. Is it necessary to restart Accumulo
to do that, or are bloom filters normally generated? I have an index cache of 256M for each

After a quick glance at the code, it appears that every time a Reader is opened to a file,
we check the configuration and use a BloomFilter if enabled. I don't think you need to restart
the tservers.

> Thanks,
> David
> -----Original Message-----
> From: Josh Elser []
> Sent: Wednesday, May 21, 2014 12:18 PM
> To:
> Subject: Re: Improving Batchscanner Performance
> I wouldn't expect that you'd see much difference moving the guid to the colfam (or colqual
for that matter).
> A few more questions that come to mind though...
> * What's the purpose of the "bin"? Your guid is likely random anyways which will give
you uniformity (which is what a bin prefix like that is usually meant to provide).
> * How many splits do you have on this table? At least a few per tserver?
> You could also try looking at the size of the index for a couple of rfiles for your table
(`bin/accumulo rfile-info '/hdfs/path/to/rfile.rf'`). I would think that you should have faster
lookups than what you noted.
> On 5/20/14, 4:34 PM, Slater, David M. wrote:
>> 10-100 entries per node (4 nodes total).
>> Would changing the data table structure change the batchscanner performance?
>> I'm using:
>> row          colFam          colQual         value
>> bin|guid     --              --              byte[]
>> would it be faster/slower to use:
>> row          colFam          colQual         value
>> bin          guid            --              byte[]
>> The difference would be that the first would include everything as a Collection of
ranges, where the second would use a combination of ranges and setting column families.
>> -----Original Message-----
>> From: Josh Elser []
>> Sent: Tuesday, May 20, 2014 3:17 PM
>> To:
>> Subject: Re: Improving Batchscanner Performance
>> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is
this over more than one node? 10s of nodes?
>> A value of 1M would would explain the pause that you see in the beginning. That parameter
controls the size of the buffer that each tserver will fill before sending data back to the
BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing,
it takes longer for you to get the first batch. You should be able to reduce that value and
see a much quick first result come out of the batchscanner.
>> Number of rfiles could impact read performance as you have to do a merged-read over
all of the rfiles for a tablet.
>> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>>> I'm getting query results around 10-100 entries/s. However, it takes some time
after starting the data scan to actually have any positive query number. The ingest rate into
this table is about 10k entries/s.
>>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest
is overwhelming the resources?
>>> -----Original Message-----
>>> From: Josh Elser []
>>> Sent: Tuesday, May 20, 2014 2:42 PM
>>> To:
>>> Subject: Re: Improving Batchscanner Performance
>>> No, that is how it's done. The ranges that you provide to the BatchScanner are
binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers
at once to fetch results in parallel.
>>> The point I was making is that you can only bin ranges within the scope of a
single BatchScanner, and if you were making repeated calls to your original function with
differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets
of rows and data is what I was trying to lead you to.
>>> If the bandwidth of fetching the data is not a factor, I would probably agree
that random reads are an issue. Do you have more details you can give about how long it takes
to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)?
Are you getting an even distribution across your tservers or hot-spotted on a few number (the
monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality
while avoid suffering from hotspots.
>>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>>> Josh,
>>>> The data is not significantly larger than the rows that I'm fetching. in
terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest
rate, so I don't think it's a network issue.
>>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random"
set of rows each time. I had assumed that the batchscanner would take the Collection of ranges
(when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits.
I'm guessing, based on the discussion, that it is not done that way.
>>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>> Thanks,
>>>> David
>>>> -----Original Message-----
>>>> From: Josh Elser []
>>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>>> To:
>>>> Subject: Re: Improving Batchscanner Performance
>>>> You actually stated it exactly here:
>>>>      > I complete the first scan in its entirety
>>>> Loading the data into a Collection also implies that you're loading the complete
set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>>      > Collection<Text> rows = getRowIDs(new Range("minRow",
>>>> "maxRow"), new Text("index"), "mytable", 10, 10000);  >
>>>> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches".
The client talks to server(s), reads some data and returns it to you. By virtue of you loading
these results from the Iterator into a Collection, you are consuming *all* results before
proceeding to fetch the data for the rows.
>>>> Now, if, like you said, looking up the rows is drastically faster than fetching
the data, there's a question as to why this is. Is it safe to assume that the data is much
larger than the rows you're fetching? Have you tried to see what the throughput of fetching
this data is? If it's bounded by network speed, you could try compressing the data in an iterator
server-side before returning it to the client.
>>>> You could also consider the locality of the rows that you're fetching --
are you fetching a "random" set of rows each time and paying a penalty of talking to each
server to fetch the data when you could ammortize the cost if you fetched the data for rows
that are close together. A large amount of data being returned is likely going to trump the
additional cost in talking to many servers.
>>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>>> Hi Josh,
>>>>> I should have clarified - I am using a batchscanner for both lookups.
I had thought of putting it into two different threads, but the first scan is typically an
order of magnitude faster than the second.
>>>>> The logic for upperbounding the results returned is outside of the method
I provided. Since there is a one-to-one relationship between rowIDs and records on the second
scan, I just limit the number of rows I send to this method.
>>>>> As for blocking, I'm not sure exactly what you mean. I complete the first
scan in its entirety, which  before entering this method with the collection of Text rowIDs.
The method for that is:
>>>>> public Collection<Text> getRowIDs(Collection<Range> ranges,
Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>>>              Set<Text> guids = new HashSet<Text>();
>>>>>              if (!ranges.isEmpty()) {
>>>>>                  BatchScanner scanner = conn.createBatchScanner(tablename,
new Authorizations(), queryThreads);
>>>>>                  scanner.setRanges(ranges);
>>>>>                  scanner.fetchColumnFamily(term);
>>>>>                  for (Map.Entry<Key, Value> entry : scanner) {
>>>>>                      guids.add(entry.getKey().getColumnQualifier());
>>>>>                      if (guids.size() > limit) {
>>>>>                          return null;
>>>>>                      }
>>>>>                  }
>>>>>                  scanner.close();
>>>>>              }
>>>>>              return guids;
>>>>>          }
>>>>> Essentially, my query does:
>>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>>>>> new Text("index"), "mytable", 10, 10000); Collection<byte[]> data
>>>>> = getRowData(rows, "mytable", 10);
>>>>> -----Original Message-----
>>>>> From: Josh Elser []
>>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>>> To:
>>>>> Subject: Re: Improving Batchscanner Performance
>>>>> Hi David,
>>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>>> Your BatchScanner is producing results, which you then consume by your
scanner, and ultimately return those results to the client.
>>>>> The problem with your below implementation is that you're not going to
be polling your batchscanner as aggressively as you could be. You are blocking while you can
fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered
splitting up the BatchScanner and Scanner code into two different threads?
>>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results
from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement
in performance.
>>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner
for both lookups?
>>>>> One final warning, your current implementation could also hog heap very
badly if your batchscanner returns too many records. The producer/consumer I proposed should
help here a little bit, but you should still be asserting upper-bounds to avoid running out
of heap space in your client.
>>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>>> Hey everyone,
>>>>>> I'm trying to improve the query performance of batchscans on my data
table. I first scan over index tables, which returns a set of rowIDs that correspond to the
records I am interested in. This set of records is fairly randomly (and uniformly) distributed
across a large number of tablets, due to the randomness of the UID and the query itself. Then
I want to scan over my data table, which is setup as follows:
>>>>>> row              colFam          colQual         value
>>>>>> rowUID    --                     --                      byte[] of
>>>>>> These records are fairly small (100s of bytes), but numerous (I may
return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows
returned from the first query into a set of ranges to input into the batchscanner, and then
return those rows, retrieving the value from them.
>>>>>> // returns the data associated with the given collection of rows
>>>>>>           public Collection<byte[]> getRowData(Collection<Text>
rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>>>               List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>>>               if (!rows.isEmpty()) {
>>>>>>                   BatchScanner scanner = conn.createBatchScanner(tablename,
new Authorizations(), queryThreads);
>>>>>>                   List<Range> ranges = new ArrayList<Range>();
>>>>>>                   for (Text row : rows) {
>>>>>>                       ranges.add(new Range(row));
>>>>>>                   }
>>>>>>                   scanner.setRanges(ranges);
>>>>>>                   for (Map.Entry<Key, Value> entry : scanner)
>>>>>>                       values.add(entry.getValue().get());
>>>>>>                   }
>>>>>>                   scanner.close();
>>>>>>               }
>>>>>>               return values;
>>>>>>           }
>>>>>> Is there a more efficient way to do this? I have index caches and
bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any
thoughts on how I can improve this?
>>>>>> Thanks,
>>>>>> David

View raw message