accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Elser <josh.el...@gmail.com>
Subject Re: Request for Configuration Help for basic test. Tservers dying and only one tablet being used
Date Wed, 30 Jul 2014 03:20:11 GMT
Verifying that your random data is actually random is actually a smart 
thing :). Funny things can happen when you get insufficiently random 
data and when you try to get very random data (you drain /dev/urandom's 
entropy -- like SecureRandom).

You could also try to sample every n'th write your webserver sees to 
generate a distribution client side and then compare those results to 
the distribution of work server-side.

On 7/29/14, 4:56 PM, Mike Drob wrote:
> You should double-check your data, you might find that it's null padded
> or something like that which would screw up the splits. You can do a
> scan from the shell which might give you hints.
>
>
> On Tue, Jul 29, 2014 at 3:53 PM, Pelton, Aaron A.
> <Aaron.Pelton@gd-ais.com <mailto:Aaron.Pelton@gd-ais.com>> wrote:
>
>     I agree with the idea of pooling the writers.
>
>     As for the discussion of the keys. I get what you are saying with
>     choosing better keys for distribution based on frequency of the
>     chars in the English language. But, for this test I'm just using
>     apache RandomStringUtils to create a 2 char random alpha sequence to
>     prepend, so it should be a moderately distributed sampling of chars.
>     However, let me emphasize that I mean I'm seeing 1 tablet getting
>     millions of entries in it, compared to the remaining 35 tablets
>     having no entries or just like 1k. To me that says something isn't
>     right.
>
>
>     -----Original Message-----
>     From: Josh Elser [mailto:josh.elser@gmail.com
>     <mailto:josh.elser@gmail.com>]
>     Sent: Tuesday, July 29, 2014 4:20 PM
>     To: user@accumulo.apache.org <mailto:user@accumulo.apache.org>
>     Subject: Re: Request for Configuration Help for basic test. Tservers
>     dying and only one tablet being used
>
>     On 7/29/14, 3:20 PM, Pelton, Aaron A. wrote:
>      > To followup to two of your statements/questions:
>      >
>      > 1. Good, pre-splitting your table should help with random data,
>     but if you're only writing data to one tablet, you're stuck (very
>     similar to hot-spotting reducers in MapReduce jobs).
>      >
>      > - OK so its good that the data is presplitting, but maybe this is
>     conceptually something that I'm not grasping about accumulo yet, but
>     I thought specifying the pre-splits is what causes the table to span
>     multiple tablets on the various tserver initially.  However, the
>     core of the data appears to be in one specific tablet on on tserver.
>     Each tserver appears to have a few tablets allocated to it for the
>     table I'm working out of. So, I'm confused as to how to get the data
>     to write to more than just the one tablet/partition.  I would almost
>     think my keys I specified aren't being matched correctly against
>     incoming data then?
>
>     No, it sounds like you have the idea correctly. Many tablets make up
>     a table, the split points for a table are what defines those tablet
>     boundaries. Consider you have a table where the rowID are English
>     words
>     (http://en.wikipedia.org/wiki/Letter_frequency#Relative_frequencies_of_the_first_letters_of_a_word_in_the_English_language).
>
>     If you split your table on each letter (a-z), you would still see
>     much more activity to the tablets which host words starting with
>     'a', 't', and 's' because you have significantly more data being
>     ingested into those tablets.
>
>     When designing a table (specifically the rowID of the key), it's
>     desirable to try to make the rowID as distributed as possible across
>     the entire table. This helps ensure even processing across all of
>     your nodes. Does that make sense?
>
>      > 2. What do you actually do when you receive an HTTP request to
>     write to Accumulo. It sounds like you're reading data and then
>     writing? Is each HTTP request creating its own BatchWriter? More
>     insight to what a "write" looks like in your system (in terms of
>     Accumulo API calls) would help us make recommendations about more
>     efficient things you can do.
>      >
>      > Yes each http request gets its own reference to a writer or
>     scanner, which is closed when thre result is returned from the http
>     request.  There are two rest services. One transforms the data and
>     preforms some indexes based on it and then sends both data and index
>     to a BatchWriter. The sample code for the data being written is
>     below. The indexes being written are similar but use different
>     family and qualifier values.
>      >
>      >          Text rowId = new Text(id + ":" + time);
>      >          Text fam = new Text(COLUMN_FAMILY_KLV);
>      >          Text qual = new Text("");
>      >          Value val = new Value(data.getBytes());
>      >
>      >          Mutation mut = new Mutation(rowId);
>      >          mut.put(fam, qual, val);
>      >
>      >          long memBuf = 1_000_000L;
>      >          long timeout = 1000L;
>      >          int numThreads = 10;
>      >
>      >          BatchWriter writer = null;
>      >          try
>      >          {
>      >              writer = conn.createBatchWriter(TABLE_NAME, memBuf,
>     timeout, numThreads);
>      >              writer.addMutation(mut);
>      >          }
>      >          catch (Exception x)
>      >          {
>      >              // x.printStackTrace();
>      >              logger.error(x.toString(), x);
>      >              result = "ERROR";
>      >          }
>      >          finally
>      >          {
>      >              try
>      >              {
>      >                  if (writer != null)
>      >                  {
>      >                      writer.close();
>      >                  }
>      >              }
>      >              catch (Exception x)
>      >              {
>      >                  // x.printStackTrace();
>      >                  logger.error(x.toString(), x);
>      >                  result = "ERROR";
>      >              }
>      >          }
>
>     You could try to make a threadpool for BatchWriters instead of
>     creating a new one for each HTTP thread. This might help amortize
>     the RPC cost by sending more than one mutation at a time (the
>     BatchWriter should be thread safe in this regard). You then just
>     want to call flush() instead of closing the BatchWriter.
>
>     I remember seeing that there are some optimizations within the
>     BatchWriter to write a single Mutation, but if you're really trying
>     to saturate your system, using fewer BatchWriters would likely help
>     you realize more throughput.
>
>      > At the beginning of the test, a known subset of control data
>     range is created and uploaded. For the duration of the heart of the
>     test while ongoing writes occur, queries upon data in that control
>     range are performed.  The rest service that handles the read
>     eventually hits this:
>      >
>      >          ArrayList<String> latlons = new ArrayList<String>();
>      >          Authorizations auths = new Authorizations();
>      >
>      >          Scanner scan = null;
>      >          try
>      >          {
>      >              scan = conn.createScanner(TABLE_NAME, auths);
>      >              scan.setRange(new Range(id + ":0", id + "::")); //
>     all times
>      >              scan.fetchColumnFamily(new Text(COLUMN_FAMILY_KLV));
>      >
>      >              for (Map.Entry<Key, Value> e : scan)
>      >              {
>      >                  // do stuff with e
>      >              }
>      >          }
>      >          catch (TableNotFoundException x)
>      >          {
>      >              LOGGER.fatal("The table " + TABLE_NAME + " could not
>     be found.", x);
>      >          }
>      >          finally
>      >          {
>      >              if (scan != null)
>      >              {
>      >                  scan.close();
>      >              }
>      >          }
>      >
>      > -----Original Message-----
>      > From: Josh Elser [mailto:josh.elser@gmail.com
>     <mailto:josh.elser@gmail.com>]
>      > Sent: Tuesday, July 29, 2014 1:43 PM
>      > To: user@accumulo.apache.org <mailto:user@accumulo.apache.org>
>      > Subject: Re: Request for Configuration Help for basic test. Tservers
>      > dying and only one tablet being used
>      >
>      > Some comments inline
>      >
>      > On 7/29/14, 1:07 PM, Pelton, Aaron A. wrote:
>      >> Hi All,
>      >>
>      >> I am new to Accumulo and I apologize if the answers to my questions
>      >> are already posted somewhere. I've done a fair amount of
>     googling and
>      >> poking around the manuals etc.
>      >>
>      >> I am just doing a simple test with two machines, one producing about
>      >> 600 threads on the network to stream simultaneous writes to a rest
>      >> service, and the other producing about 300 threads on the network to
>      >> perform simultaneous queries to a rest service. The rest service has
>      >> Accumulo API calls in it to write out and query data.
>      >>
>      >> I have inherited the following configuration
>      >>
>      >> -Squirrel Bundle distribution of Accumulo 1.5.0
>      >>
>      >> -1 Master machine to start and stop Accumulo services on
>      >>
>      >> -12 data nodes running tservers. The first three of these also
>      >> running the zookeeper instances. And, nodes 4-6 running tracers.
>      >>
>      >> I have noticed the following issues with configuration and changed
>      >> them as follows
>      >>
>      >> -Changed swapiness to 0 on all nodes
>      >>
>      >> -Was getting OutOfMemoryExceptions after the above still, and after
>      >> running test for long duration. Thus, increased Java Heap size from
>      >> 1g to 4g, which is still far below the physical ram on the nodes.
>      >>
>      >> -Increased java heap from 1g to 2g on master node
>      >>
>      >> -I also increased the following properties
>      >>
>      >> o  <property>
>      >>
>      >> o    <name>tserver.memory.maps.max</name>
>      >>
>      >> o    <value>2G</value>
>      >>
>      >> o  </property>
>      >>
>      >> o
>      >>
>      >> o  <property>
>      >>
>      >> o    <name>tserver.cache.data.size</name>
>      >>
>      >> o    <value>512M</value>
>      >>
>      >> o  </property>
>      >>
>      >> o
>      >>
>      >> o  <property>
>      >>
>      >> o    <name>tserver.cache.index.size</name>
>      >>
>      >> o    <value>512M</value>
>      >>
>      >> o  </property>
>      >>
>      >> -Changed the ulimit for virtual memory to unlimited
>      >>
>      >> -Changed the ulimit for files opened to 65536
>      >>
>      >> -Changed the ulimit for max user processes to 1024
>      >
>      > These all look good. Just keep in mind that tserver.cache.data.size
>      > and tserver.cache.index.size will be on the JVM heap while
>      > tserver.memory.maps.max is off heap (assuming you're using the native
>      > maps which you very well should be -- I assume Sqrrl's distro set
>     this
>      > up for you)
>      >
>      >> -A tomcat instance with a server socket accepting up to 1,000
>     threads
>      >> / user connections to a rest service that eventually makes a read /
>      >> write out to an Accumulo connector instance.
>      >>
>      >> -Changed the zookeeper connection limit max to 0 since this is
>     just a
>      >> test environment
>      >>
>      >> -Noticed that code I had inherited didn't have close calls on the
>      >> scanner objects in the rest service b/c it was originally designed
>      >> for Accumulo 1.4 in which there wasn't such an API.
>      >
>      > Scanners can clean up after themselves, whereas BatchScanners
>     don't. A
>      > close method was added to ScannerBase (the parent class of
>     Scanner and
>      > BatchScanner) to let you seamlessly swap out a Scanner with a
>     BatchScanner (and vice versa) while not leaking any resources. In
>     short, you can call Scanner#close, but it's just a no-op.
>      >
>      >> -This may be wrong, but in an effort to see my ~900 connections
>      >> simultaneously get as much access to db writes/reads for
>     servicing, I
>      >> up'd some thread counts for
>      >>
>      >> o  <property>
>      >>
>      >> o    <name>tserver.server.threads.minimum</name>
>      >>
>      >> o    <value>75</value>
>      >>
>      >> o  </property>
>      >>
>      >> o
>      >>
>      >> o  <property>
>      >>
>      >> o    <name>master.server.threads.minimum</name>
>      >>
>      >> o    <value>300</value>
>      >>
>      >> o  </property>
>      >>
>      >> I have a couple of problems to note:
>      >>
>      >> 1.Ingest speeds seem kinda slow. I would anticipate network overhead
>      >> but not enough to reduce writes to 125 records / sec when each
>     record
>      >> is only a few kB.
>      >
>      > What do you actually do when you receive an HTTP request to write
>     to Accumulo. It sounds like you're reading data and then writing? Is
>     each HTTP request creating its own BatchWriter? More insight to what
>     a "write" looks like in your system (in terms of Accumulo API calls)
>     would help us make recommendations about more efficient things you
>     can do.
>      >
>      >> a.I believe this is due to the fact that I'm only seeing one tserver
>      >> primarily active at ingesting, with one tbalet in particular for the
>      >> table receiving the bulk of the data.
>      >>
>      >> b.I have added pre-splits upon table creation for each letter of the
>      >> alphabet, plus the digits 0-9. As this is a test with a simple loop
>      >> creating ID values, I throw 2 alpha chars randomly in front of the
>      >> generated number in my loop and use that as the ID to distribute
>      >> hopefully the IDs across tablets for this table.  A record ID
>      >> ingested might look like "bk1234:8876", whereby it has random 2
>      >> chars, orig ID value, colon, and a timestamp.  Sample pre-splitting:
>      >> (Granted the array could be constructed more gracefully, but for
>     a quick test, meh).
>      >>
>      >> *try*
>      >>
>      >>           {
>      >>
>      >> conn.tableOperations().create(/TABLE_NAME/);
>      >>
>      >> *final*SortedSet<Text> sortedSplits = *new*TreeSet<Text>();
>      >>
>      >> *for*(String binPrefix : *new*String[] { "a", "b", "c", "d", "e",
>      >> "f", "g", "h", "i", "j", "k", "l", "m",
>      >>
>      >> "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z",
>     "1",
>      >> "2", "3", "4", "5", "6", "7",
>      >>
>      >> "8", "9", "0"})
>      >>
>      >>               {
>      >>
>      >>                   sortedSplits.add(*new*Text(binPrefix));
>      >>
>      >>               }
>      >>
>      >> conn.tableOperations().addSplits(/TABLE_NAME/, sortedSplits);
>      >>
>      >>           }
>      >>
>      >> *catch*(TableExistsException | TableNotFoundException exception)
>      >>
>      >>           {
>      >>
>      >> /LOGGER/.warn("Could not create table or sorted splits", exception);
>      >>
>      >>           }
>      >
>      > Good, pre-splitting your table should help with random data, but
>     if you're only writing data to one tablet, you're stuck (very
>     similar to hot-spotting reducers in MapReduce jobs).
>      >
>      >> 2.Tservers running on the data node halt after about 4 hours in of
>      >> processing.  I'm attempting to ingest into the billions, hopefully
>      >> trillions of records range.  Generally it is the ones that aren't
>      >> under load in the beginning, until finally the one that is handling
>      >> the bulk of the load crashes typically last. In the beginning, I
>      >> noticed in the tserver logs the OutOfMemoryException, but haven't
>      >> seen that in the past few runs after the memory adjustments. In fact
>      >> the tserver log doesn't say anything about why it stopped.  Also
>      >> didn't notice anything unusual in the zookeeper log other than
>     the occasional CancelledKeyException.
>      >
>      > Make sure you check both the tserver_hostname.debug.log,
>     tserver_hostname.out and tserver_hostname.err files. OOMEs sometimes
>     don't make it to the log file because of the JVM tearing down. You
>     should be able to find something as to why the tserver stopped.
>      >
>      >> 3.Lastly can anyone approximate with the 12 nodes that I have, what
>      >> kind of ingest speed should I see if things were configured
>     correctly
>      >> in number of records per second based on small record sizes of a
>     few kB.
>      >> And, is anything obviously wrong with the configurations mentioned
>      >> above that would improve throughput?
>      >
>      > Generally, a "normal" machine will be able to do ingest of about
>     200k records at 150bytes for ~30MB/s.
>      >
>      > You might also want to try increasing tserver.mutation.queue.max
>     to 1M in accumulo-site.xml (restart required). You can find some
>     extra information about that on the releases notes:
>      > http://accumulo.apache.org/release_notes/1.5.1.html#known-issues.
>     Not sure if Sqrrl's distribution has done this already for you.
>      >
>      >
>      >> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>      >>
>      >> --
>      >>
>      >> Sincerely,
>      >>
>      >> Aaron Pelton
>      >>
>
>

Mime
View raw message