> Sure, but increasing the number of consumers can increase the throughput (without increasing the number of Kudu tablet servers).

I see. Make sense. I'll test that later.

> Currently, if you run 'top' on the TS nodes, do you see them using a high amount of CPU? Similar question for 'iostat -dxm 1' - high IO utilization? My guess is that at 15k/sec you are hardly utilizing the nodes, and you're mostly bound by round trip latencies, etc.

From the top and iostat commands, the TS nodes seem pretty under-utilized. CPU usage is less than 10%.

In manual flush mode, it's up to you to determine how big your batches are. It will buffer until you call 'Flush()'. So you could wait until you've accumulated way more than 1000 to flush.

Got it. I meant the default buffer size is 1000 - found out that I need to bump this up in order to bypass "buffer is too big" error.

In your AUTO_FLUSH test, were you still calling Flush()?

Yes.

Given this, are you hash-partitioning on just the UUID portion of the PK? ie if your PK is (uuid, timestamp), you could hash-partitition on the UUID. This should ensure that you get pretty good batching of the writes.

Yes, I only hash-partitioned on the UUID portion. 

Best,
Chao

On Tue, Oct 31, 2017 at 11:25 PM, Todd Lipcon <todd@cloudera.com> wrote:


On Tue, Oct 31, 2017 at 11:14 PM, Chao Sun <sunchao@uber.com> wrote:
Thanks Zhen and Todd. 

Yes increasing the # of consumers will definitely help, but we also want to test the best throughput we can get from Kudu.

Sure, but increasing the number of consumers can increase the throughput (without increasing the number of Kudu tablet servers).

Currently, if you run 'top' on the TS nodes, do you see them using a high amount of CPU? Similar question for 'iostat -dxm 1' - high IO utilization? My guess is that at 15k/sec you are hardly utilizing the nodes, and you're mostly bound by round trip latencies, etc.
 

I think the default batch size is 1000 rows?

In manual flush mode, it's up to you to determine how big your batches are. It will buffer until you call 'Flush()'. So you could wait until you've accumulated way more than 1000 to flush.
 
I tested with a few different options between 1000 and 200000, but always got some number between 15K to 20K per sec. Also tried flush background mode and 32 hash partitions but results are similar.

In your AUTO_FLUSH test, were you still calling Flush()?
 
The primary key is UUID + some string column though - they always come in batches, e.g., 300 rows for uuid1 followed by 400 rows for uuid2, etc. 

Given this, are you hash-partitioning on just the UUID portion of the PK? ie if your PK is (uuid, timestamp), you could hash-partitition on the UUID. This should ensure that you get pretty good batching of the writes.

Todd


On Tue, Oct 31, 2017 at 6:25 PM, Todd Lipcon <todd@cloudera.com> wrote:
In addition to what Zhen suggests, I'm also curious how you are sizing your batches in manual-flush mode? With 128 hash partitions, each batch is generating 128 RPCs, so if for example you are only batching 1000 rows at a time, you'll end up with a lot of fixed overhead in each RPC to insert just 1000/128 = ~8 rows.

Generally I would expect an 8 node cluster (even with HDDs) to be able to sustain several hundred thousand rows/second insert rate. Of course, it depends on the size of the rows and also the primary key you've chosen. If your primary key is generally increasing (such as the kafka sequence number) then you should have very little compaction and good performance.

-Todd

On Tue, Oct 31, 2017 at 6:20 PM, Zhen Zhang <zhquake@gmail.com> wrote:
Maybe you can add your consumer number? In my opinion, more threads to insert can give a better throughput.

2017-10-31 15:07 GMT+08:00 Chao Sun <sunchao@uber.com>:
OK. Thanks! I changed to manual flush mode and it increased to ~15K / sec. :)

Is there any other tuning I can do to further improve this? and also, how much would
SSD help in this case (only upsert)?

Thanks again,
Chao

On Mon, Oct 30, 2017 at 11:42 PM, Todd Lipcon <todd@cloudera.com> wrote:
If you want to manage batching yourself you can use the manual flush mode. Easiest would be the auto flush background mode.

Todd

On Oct 30, 2017 11:10 PM, "Chao Sun" <sunchao@uber.com> wrote:
Hi Todd,

Thanks for the reply! I used a single Kafka consumer to pull the data.
For Kudu, I was doing something very simple that basically just follow the example here.
In specific:

loop {
  Insert insert = kuduTable.newInsert();
  PartialRow row = insert.getRow();
  // fill the columns
  kuduSession.apply(insert)
}

I didn't specify the flushing mode, so it will pick up the AUTO_FLUSH_SYNC as default?
should I use MANUAL_FLUSH?

Thanks,
Chao

On Mon, Oct 30, 2017 at 10:39 PM, Todd Lipcon <todd@cloudera.com> wrote:
Hey Chao,

Nice to hear you are checking out Kudu.

What are you using to consume from Kafka and write to Kudu? Is it possible that it is Java code and you are using the SYNC flush mode? That would result in a separate round trip for each record and thus very low throughput.

Todd

On Oct 30, 2017 10:23 PM, "Chao Sun" <sunchao@uber.com> wrote:
Hi,

We are evaluating Kudu (version kudu 1.3.0-cdh5.11.1, revision af02f3ea6d9a1807dcac0ec75bfbca79a01a5cab) on a 8-node cluster.
The data are coming from Kafka at a rate of around 30K / sec, and hash partitioned into 128 buckets. However, with default settings, Kudu can only consume the topics at a rate of around 1.5K / second. This is a direct ingest with no transformation on the data.

Could this because I was using the default configurations? also we are using Kudu on HDD - could that also be related?

Any help would be appreciated. Thanks.

Best,
Chao







--
Todd Lipcon
Software Engineer, Cloudera




--
Todd Lipcon
Software Engineer, Cloudera