accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Drob <mad...@cloudera.com>
Subject Re: Request for Configuration Help for basic test. Tservers dying and only one tablet being used
Date Tue, 29 Jul 2014 20:56:41 GMT
You should double-check your data, you might find that it's null padded or
something like that which would screw up the splits. You can do a scan from
the shell which might give you hints.


On Tue, Jul 29, 2014 at 3:53 PM, Pelton, Aaron A. <Aaron.Pelton@gd-ais.com>
wrote:

> I agree with the idea of pooling the writers.
>
> As for the discussion of the keys. I get what you are saying with choosing
> better keys for distribution based on frequency of the chars in the English
> language. But, for this test I'm just using apache RandomStringUtils to
> create a 2 char random alpha sequence to prepend, so it should be a
> moderately distributed sampling of chars. However, let me emphasize that I
> mean I'm seeing 1 tablet getting millions of entries in it, compared to the
> remaining 35 tablets having no entries or just like 1k. To me that says
> something isn't right.
>
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, July 29, 2014 4:20 PM
> To: user@accumulo.apache.org
> Subject: Re: Request for Configuration Help for basic test. Tservers dying
> and only one tablet being used
>
> On 7/29/14, 3:20 PM, Pelton, Aaron A. wrote:
> > To followup to two of your statements/questions:
> >
> > 1. Good, pre-splitting your table should help with random data, but if
> you're only writing data to one tablet, you're stuck (very similar to
> hot-spotting reducers in MapReduce jobs).
> >
> > - OK so its good that the data is presplitting, but maybe this is
> conceptually something that I'm not grasping about accumulo yet, but I
> thought specifying the pre-splits is what causes the table to span multiple
> tablets on the various tserver initially.  However, the core of the data
> appears to be in one specific tablet on on tserver. Each tserver appears to
> have a few tablets allocated to it for the table I'm working out of. So,
> I'm confused as to how to get the data to write to more than just the one
> tablet/partition.  I would almost think my keys I specified aren't being
> matched correctly against incoming data then?
>
> No, it sounds like you have the idea correctly. Many tablets make up a
> table, the split points for a table are what defines those tablet
> boundaries. Consider you have a table where the rowID are English words (
> http://en.wikipedia.org/wiki/Letter_frequency#Relative_frequencies_of_the_first_letters_of_a_word_in_the_English_language
> ).
>
> If you split your table on each letter (a-z), you would still see much
> more activity to the tablets which host words starting with 'a', 't', and
> 's' because you have significantly more data being ingested into those
> tablets.
>
> When designing a table (specifically the rowID of the key), it's desirable
> to try to make the rowID as distributed as possible across the entire
> table. This helps ensure even processing across all of your nodes. Does
> that make sense?
>
> > 2. What do you actually do when you receive an HTTP request to write to
> Accumulo. It sounds like you're reading data and then writing? Is each HTTP
> request creating its own BatchWriter? More insight to what a "write" looks
> like in your system (in terms of Accumulo API calls) would help us make
> recommendations about more efficient things you can do.
> >
> > Yes each http request gets its own reference to a writer or scanner,
> which is closed when thre result is returned from the http request.  There
> are two rest services. One transforms the data and preforms some indexes
> based on it and then sends both data and index to a BatchWriter. The sample
> code for the data being written is below. The indexes being written are
> similar but use different family and qualifier values.
> >
> >          Text rowId = new Text(id + ":" + time);
> >          Text fam = new Text(COLUMN_FAMILY_KLV);
> >          Text qual = new Text("");
> >          Value val = new Value(data.getBytes());
> >
> >          Mutation mut = new Mutation(rowId);
> >          mut.put(fam, qual, val);
> >
> >          long memBuf = 1_000_000L;
> >          long timeout = 1000L;
> >          int numThreads = 10;
> >
> >          BatchWriter writer = null;
> >          try
> >          {
> >              writer = conn.createBatchWriter(TABLE_NAME, memBuf,
> timeout, numThreads);
> >              writer.addMutation(mut);
> >          }
> >          catch (Exception x)
> >          {
> >              // x.printStackTrace();
> >              logger.error(x.toString(), x);
> >              result = "ERROR";
> >          }
> >          finally
> >          {
> >              try
> >              {
> >                  if (writer != null)
> >                  {
> >                      writer.close();
> >                  }
> >              }
> >              catch (Exception x)
> >              {
> >                  // x.printStackTrace();
> >                  logger.error(x.toString(), x);
> >                  result = "ERROR";
> >              }
> >          }
>
> You could try to make a threadpool for BatchWriters instead of creating a
> new one for each HTTP thread. This might help amortize the RPC cost by
> sending more than one mutation at a time (the BatchWriter should be thread
> safe in this regard). You then just want to call flush() instead of closing
> the BatchWriter.
>
> I remember seeing that there are some optimizations within the BatchWriter
> to write a single Mutation, but if you're really trying to saturate your
> system, using fewer BatchWriters would likely help you realize more
> throughput.
>
> > At the beginning of the test, a known subset of control data range is
> created and uploaded. For the duration of the heart of the test while
> ongoing writes occur, queries upon data in that control range are
> performed.  The rest service that handles the read eventually hits this:
> >
> >          ArrayList<String> latlons = new ArrayList<String>();
> >          Authorizations auths = new Authorizations();
> >
> >          Scanner scan = null;
> >          try
> >          {
> >              scan = conn.createScanner(TABLE_NAME, auths);
> >              scan.setRange(new Range(id + ":0", id + "::")); // all times
> >              scan.fetchColumnFamily(new Text(COLUMN_FAMILY_KLV));
> >
> >              for (Map.Entry<Key, Value> e : scan)
> >              {
> >                  // do stuff with e
> >              }
> >          }
> >          catch (TableNotFoundException x)
> >          {
> >              LOGGER.fatal("The table " + TABLE_NAME + " could not be
> found.", x);
> >          }
> >          finally
> >          {
> >              if (scan != null)
> >              {
> >                  scan.close();
> >              }
> >          }
> >
> > -----Original Message-----
> > From: Josh Elser [mailto:josh.elser@gmail.com]
> > Sent: Tuesday, July 29, 2014 1:43 PM
> > To: user@accumulo.apache.org
> > Subject: Re: Request for Configuration Help for basic test. Tservers
> > dying and only one tablet being used
> >
> > Some comments inline
> >
> > On 7/29/14, 1:07 PM, Pelton, Aaron A. wrote:
> >> Hi All,
> >>
> >> I am new to Accumulo and I apologize if the answers to my questions
> >> are already posted somewhere. I've done a fair amount of googling and
> >> poking around the manuals etc.
> >>
> >> I am just doing a simple test with two machines, one producing about
> >> 600 threads on the network to stream simultaneous writes to a rest
> >> service, and the other producing about 300 threads on the network to
> >> perform simultaneous queries to a rest service. The rest service has
> >> Accumulo API calls in it to write out and query data.
> >>
> >> I have inherited the following configuration
> >>
> >> -Squirrel Bundle distribution of Accumulo 1.5.0
> >>
> >> -1 Master machine to start and stop Accumulo services on
> >>
> >> -12 data nodes running tservers. The first three of these also
> >> running the zookeeper instances. And, nodes 4-6 running tracers.
> >>
> >> I have noticed the following issues with configuration and changed
> >> them as follows
> >>
> >> -Changed swapiness to 0 on all nodes
> >>
> >> -Was getting OutOfMemoryExceptions after the above still, and after
> >> running test for long duration. Thus, increased Java Heap size from
> >> 1g to 4g, which is still far below the physical ram on the nodes.
> >>
> >> -Increased java heap from 1g to 2g on master node
> >>
> >> -I also increased the following properties
> >>
> >> o  <property>
> >>
> >> o    <name>tserver.memory.maps.max</name>
> >>
> >> o    <value>2G</value>
> >>
> >> o  </property>
> >>
> >> o
> >>
> >> o  <property>
> >>
> >> o    <name>tserver.cache.data.size</name>
> >>
> >> o    <value>512M</value>
> >>
> >> o  </property>
> >>
> >> o
> >>
> >> o  <property>
> >>
> >> o    <name>tserver.cache.index.size</name>
> >>
> >> o    <value>512M</value>
> >>
> >> o  </property>
> >>
> >> -Changed the ulimit for virtual memory to unlimited
> >>
> >> -Changed the ulimit for files opened to 65536
> >>
> >> -Changed the ulimit for max user processes to 1024
> >
> > These all look good. Just keep in mind that tserver.cache.data.size
> > and tserver.cache.index.size will be on the JVM heap while
> > tserver.memory.maps.max is off heap (assuming you're using the native
> > maps which you very well should be -- I assume Sqrrl's distro set this
> > up for you)
> >
> >> -A tomcat instance with a server socket accepting up to 1,000 threads
> >> / user connections to a rest service that eventually makes a read /
> >> write out to an Accumulo connector instance.
> >>
> >> -Changed the zookeeper connection limit max to 0 since this is just a
> >> test environment
> >>
> >> -Noticed that code I had inherited didn't have close calls on the
> >> scanner objects in the rest service b/c it was originally designed
> >> for Accumulo 1.4 in which there wasn't such an API.
> >
> > Scanners can clean up after themselves, whereas BatchScanners don't. A
> > close method was added to ScannerBase (the parent class of Scanner and
> > BatchScanner) to let you seamlessly swap out a Scanner with a
> BatchScanner (and vice versa) while not leaking any resources. In short,
> you can call Scanner#close, but it's just a no-op.
> >
> >> -This may be wrong, but in an effort to see my ~900 connections
> >> simultaneously get as much access to db writes/reads for servicing, I
> >> up'd some thread counts for
> >>
> >> o  <property>
> >>
> >> o    <name>tserver.server.threads.minimum</name>
> >>
> >> o    <value>75</value>
> >>
> >> o  </property>
> >>
> >> o
> >>
> >> o  <property>
> >>
> >> o    <name>master.server.threads.minimum</name>
> >>
> >> o    <value>300</value>
> >>
> >> o  </property>
> >>
> >> I have a couple of problems to note:
> >>
> >> 1.Ingest speeds seem kinda slow. I would anticipate network overhead
> >> but not enough to reduce writes to 125 records / sec when each record
> >> is only a few kB.
> >
> > What do you actually do when you receive an HTTP request to write to
> Accumulo. It sounds like you're reading data and then writing? Is each HTTP
> request creating its own BatchWriter? More insight to what a "write" looks
> like in your system (in terms of Accumulo API calls) would help us make
> recommendations about more efficient things you can do.
> >
> >> a.I believe this is due to the fact that I'm only seeing one tserver
> >> primarily active at ingesting, with one tbalet in particular for the
> >> table receiving the bulk of the data.
> >>
> >> b.I have added pre-splits upon table creation for each letter of the
> >> alphabet, plus the digits 0-9. As this is a test with a simple loop
> >> creating ID values, I throw 2 alpha chars randomly in front of the
> >> generated number in my loop and use that as the ID to distribute
> >> hopefully the IDs across tablets for this table.  A record ID
> >> ingested might look like "bk1234:8876", whereby it has random 2
> >> chars, orig ID value, colon, and a timestamp.  Sample pre-splitting:
> >> (Granted the array could be constructed more gracefully, but for a
> quick test, meh).
> >>
> >> *try*
> >>
> >>           {
> >>
> >> conn.tableOperations().create(/TABLE_NAME/);
> >>
> >> *final*SortedSet<Text> sortedSplits = *new*TreeSet<Text>();
> >>
> >> *for*(String binPrefix : *new*String[] { "a", "b", "c", "d", "e",
> >> "f", "g", "h", "i", "j", "k", "l", "m",
> >>
> >> "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z", "1",
> >> "2", "3", "4", "5", "6", "7",
> >>
> >> "8", "9", "0"})
> >>
> >>               {
> >>
> >>                   sortedSplits.add(*new*Text(binPrefix));
> >>
> >>               }
> >>
> >> conn.tableOperations().addSplits(/TABLE_NAME/, sortedSplits);
> >>
> >>           }
> >>
> >> *catch*(TableExistsException | TableNotFoundException exception)
> >>
> >>           {
> >>
> >> /LOGGER/.warn("Could not create table or sorted splits", exception);
> >>
> >>           }
> >
> > Good, pre-splitting your table should help with random data, but if
> you're only writing data to one tablet, you're stuck (very similar to
> hot-spotting reducers in MapReduce jobs).
> >
> >> 2.Tservers running on the data node halt after about 4 hours in of
> >> processing.  I'm attempting to ingest into the billions, hopefully
> >> trillions of records range.  Generally it is the ones that aren't
> >> under load in the beginning, until finally the one that is handling
> >> the bulk of the load crashes typically last. In the beginning, I
> >> noticed in the tserver logs the OutOfMemoryException, but haven't
> >> seen that in the past few runs after the memory adjustments. In fact
> >> the tserver log doesn't say anything about why it stopped.  Also
> >> didn't notice anything unusual in the zookeeper log other than the
> occasional CancelledKeyException.
> >
> > Make sure you check both the tserver_hostname.debug.log,
> tserver_hostname.out and tserver_hostname.err files. OOMEs sometimes don't
> make it to the log file because of the JVM tearing down. You should be able
> to find something as to why the tserver stopped.
> >
> >> 3.Lastly can anyone approximate with the 12 nodes that I have, what
> >> kind of ingest speed should I see if things were configured correctly
> >> in number of records per second based on small record sizes of a few kB.
> >> And, is anything obviously wrong with the configurations mentioned
> >> above that would improve throughput?
> >
> > Generally, a "normal" machine will be able to do ingest of about 200k
> records at 150bytes for ~30MB/s.
> >
> > You might also want to try increasing tserver.mutation.queue.max to 1M
> in accumulo-site.xml (restart required). You can find some extra
> information about that on the releases notes:
> > http://accumulo.apache.org/release_notes/1.5.1.html#known-issues. Not
> sure if Sqrrl's distribution has done this already for you.
> >
> >
> >> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> >>
> >> --
> >>
> >> Sincerely,
> >>
> >> Aaron Pelton
> >>
>

Mime
View raw message