From commits-return-6681-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Fri Oct 26 21:03:40 2018
Return-Path:
kuduRDD
APIkuduRDD
APIThe Java client has a new way to express scan predicates: the
-KuduPredicate
.
+KuduPredicate
.
The API matches the corresponding C++ API more closely, and adds support for
specifying exclusive, as well as inclusive, range predicates. The existing
-ColumnRangePredicate
+ColumnRangePredicate
API has been deprecated, and will be removed soon. Example of transitioning from
the old to new API:
The scan optimizations in the server and C++ client, and the new KuduPredicate
+
The scan optimizations in the server and C++ client, and the new KuduPredicate
API in the Java client are made possible by an overhaul of how predicates are
handled internally. A new protobuf message type,
-ColumnPredicatePB
+ColumnPredicatePB
has been introduced, and will allow more column predicate types to be introduced
in the future. If you are interested in contributing to Kudu but don’t know
-where to start, consider adding a new predicate type; for example the IS NULL
,
-IS NOT NULL
, IN
, and LIKE
predicates types are currently not implemented.
IS NULL
,
+IS NOT NULL
, IN
, and LIKE
predicates types are currently not implemented.
@@ -196,6 +196,8 @@ where to start, consider adding a new predicate type; for example the Recent posts
+ - Apache Kudu 1.8.0 Released
+
- Index Skip Scan Optimization in Kudu
- Simplified Data Pipelines with Kudu
@@ -224,8 +226,6 @@ where to start, consider adding a new predicate type; for example the Apache Kudu Weekly Update November 15th, 2016
- - Apache Kudu Weekly Update November 1st, 2016
-
http://git-wip-us.apache.org/repos/asf/kudu-site/blob/854be1d3/2016/04/25/weekly-update.html
----------------------------------------------------------------------
diff --git a/2016/04/25/weekly-update.html b/2016/04/25/weekly-update.html
index fa75dea..bfc2219 100644
--- a/2016/04/25/weekly-update.html
+++ b/2016/04/25/weekly-update.html
@@ -215,6 +215,8 @@ Kudu 0.8
.
The single-node Kudu cluster was configured, started, and stopped by a Python script run_experiments.py
which cycled through several different configurations, completely removing all data in between each iteration. For each Kudu configuration, YCSB was used to load 100M rows of data (each approximately 1KB). YCSB is configured with 16 client threads on the same node. For each configuration, the YCSB log as well as periodic dumps of Tablet Server metrics are captured for later analysis.
The single-node Kudu cluster was configured, started, and stopped by a Python script run_experiments.py
which cycled through several different configurations, completely removing all data in between each iteration. For each Kudu configuration, YCSB was used to load 100M rows of data (each approximately 1KB). YCSB is configured with 16 client threads on the same node. For each configuration, the YCSB log as well as periodic dumps of Tablet Server metrics are captured for later analysis.
Note that in many cases, the 16 client threads were not enough to max out the full performance of the machine. These experiments should not be taken to determine the maximum throughput of Kudu – instead, we are looking at comparing the relative performance of different configuration options.
The first set of experiments runs the YCSB load with the sync_ops=true
configuration option. This option means that each client thread will insert one row at a time and synchronously wait for the response before inserting the next row. The lack of batching makes this a good stress test for Kudu’s RPC performance and other fixed per-request costs.
The first set of experiments runs the YCSB load with the sync_ops=true
configuration option. This option means that each client thread will insert one row at a time and synchronously wait for the response before inserting the next row. The lack of batching makes this a good stress test for Kudu’s RPC performance and other fixed per-request costs.
The fact that the requests are synchronous also makes it easy to measure the latency of the write requests. With request batching enabled, latency would be irrelevant.
@@ -170,25 +170,25 @@ -Average throughput: 31163 ops/sec
+
The results here are interesting: the throughput starts out around 70K rows/second, but then collapses to nearly zero. After staying near zero for a while, it shoots back up to the original performance, and the pattern repeats many times.
Also note that the 99th percentile latency seems to alternate between close to zero and a value near 500ms. This bimodal distribution led me to grep in the Java source for the magic number 500. Sure enough, I found:
-public static final int SLEEP_TIME = 500;
+
Used in this backoff calculation method (slightly paraphrased here):
- long getSleepTimeForRpc(KuduRpc<?> rpc) {
+ // TODO backoffs? Sleep in increments of 500 ms, plus some random time up to 50
+ return (attemptCount * SLEEP_TIME) + sleepRandomizer.nextInt(50);
+ }
+
-One reason that a client will back off and retry is a SERVER_TOO_BUSY
response from the server. This response is used in a number of overload situations. In a write-mostly workload, the most likely situation is that the server is low on memory and thus asking clients to back off while it flushes. Sure enough, when we graph the heap usage over time, as well as the rate of writes rejected due to low-memory, we see that this is the case:
One reason that a client will back off and retry is a SERVER_TOO_BUSY
response from the server. This response is used in a number of overload situations. In a write-mostly workload, the most likely situation is that the server is low on memory and thus asking clients to back off while it flushes. Sure enough, when we graph the heap usage over time, as well as the rate of writes rejected due to low-memory, we see that this is the case:
plot_ts_metric(data['default'], "heap_allocated", "Heap usage (GB)", 1024*1024*1024)
plot_ts_metric(data['default'], "mem_rejections", "Rejected writes\nper sec")
I then re-ran the workload while watching iostat -dxm 1
to see the write rates across all of the disks. I could see that each of the disks was busy in turn, rather than busy in parallel.
I then re-ran the workload while watching iostat -dxm 1
to see the write rates across all of the disks. I could see that each of the disks was busy in turn, rather than busy in parallel.
This reminded me that the default way in which Kudu flushes data is as follows:
-for each column:
open a new block on disk to write that column, round-robining across disks
iterate over data:
append data to the already-open blocks
for each column:
fsync() the block of data
close the block
-
Because Kudu uses buffered writes, the actual appending of data to the open blocks does not generate immediate IO. Instead, it only dirties pages in the Linux page cache. The actual IO is performed with the fsync
call at the end. Because Kudu defaults to fsyncing each file in turn from a single thread, this was causing the slow performance identified above.
Because Kudu uses buffered writes, the actual appending of data to the open blocks does not generate immediate IO. Instead, it only dirties pages in the Linux page cache. The actual IO is performed with the fsync
call at the end. Because Kudu defaults to fsyncing each file in turn from a single thread, this was causing the slow performance identified above.
At this point, I consulted with Adar Dembo, who designed much of this code path. He reminded me that we actually have a configuration flag cfile_do_on_finish=flush
which changes the code to something resembling the following:
At this point, I consulted with Adar Dembo, who designed much of this code path. He reminded me that we actually have a configuration flag cfile_do_on_finish=flush
which changes the code to something resembling the following:
for each column:
open a new block on disk to write that column, round-robining across disks
iterate over data:
append data to the already-open blocks
@@ -231,16 +231,16 @@ for each column:
for each column:
fsync the block
close the block
-
The sync_file_range
call here asynchronously enqueues the dirty pages to be written back to the disks, and then the following fsync
actually waits for the writeback to be complete. I ran the benchmark for a new configuration with this flag enabled, and plotted the results:
The sync_file_range
call here asynchronously enqueues the dirty pages to be written back to the disks, and then the following fsync
actually waits for the writeback to be complete. I ran the benchmark for a new configuration with this flag enabled, and plotted the results:
plot_throughput_latency(data['finish=flush'])
Average throughput: 52457 ops/sec
+
This is already a substantial improvement from the default settings. The overall throughput has increased from 31K ops/second to 52K ops/second (67%), and we no longer see any dramatic drops in performance or increases in 99th percentile. In fact, the 99th percentile stays comfortably below 1ms for the entire test.
@@ -282,14 +282,14 @@ for each column:Writing a lot of small flushes compared to a small number of large flushes means that the on-disk data is not as well sorted in the optimized workload. An individual write may need to consult up to 20 bloom filters corresponding to previously flushed pieces of data in order to ensure that it is not an insert with a duplicate primary key.
-So, how can we address this issue? It turns out that the flush threshold is actually configurable with the flush_threshold_mb
flag. I re-ran the workload yet another time with the flush threshold set to 20GB.
So, how can we address this issue? It turns out that the flush threshold is actually configurable with the flush_threshold_mb
flag. I re-ran the workload yet another time with the flush threshold set to 20GB.
plot_throughput_latency(data['finish=flush+20GB-threshold'])
Average throughput: 67123 ops/sec
+
This gets us another 28% improvement from 52K ops/second up to 67K ops/second (+116% from the default), and we no longer see the troubling downward slope on the throughput graph. Let’s check on the memory and bloom filter metrics again.
@@ -318,13 +318,13 @@ for each column:The above tests were done with the sync_ops=true
YCSB configuration option. However, we expect that for many heavy write situations, the writers would batch many rows together into larger write operations for better throughput.
The above tests were done with the sync_ops=true
YCSB configuration option. However, we expect that for many heavy write situations, the writers would batch many rows together into larger write operations for better throughput.
I wanted to ensure that the recommended configuration changes above also improved performance for this workload. So, I re-ran the same experiments, but with YCSB configured to send batches of 100 insert operations to the tablet server using the Kudu client’s AUTO_FLUSH_BACKGROUND
write mode.
I wanted to ensure that the recommended configuration changes above also improved performance for this workload. So, I re-ran the same experiments, but with YCSB configured to send batches of 100 insert operations to the tablet server using the Kudu client’s AUTO_FLUSH_BACKGROUND
write mode.
This time, I compared four configurations:
- the Kudu default settings
-- the defaults, but configured with cfile_do_on_finish=flush
to increase flush IO performance
+- the defaults, but configured with cfile_do_on_finish=flush
to increase flush IO performance
- the above, but with the flush thresholds configured to 1G and 10G
For these experiments, we don’t plot latencies, since write latencies are meaningless with batching enabled.
@@ -341,8 +341,8 @@ for each column: -Average throughput: 33319 ops/sec
+
@@ -351,8 +351,8 @@ for each column:
-Average throughput: 80068 ops/sec
+
@@ -361,8 +361,8 @@ for each column:
-Average throughput: 78040 ops/sec
+
@@ -371,8 +371,8 @@ for each column:
-Average throughput: 82005 ops/sec
+
@@ -393,8 +393,8 @@ for each column:
We will likely make these changes in the next Kudu release. In the meantime, users can experiment by adding the following flags to their tablet server configuration:
--cfile_do_on_finish=flush
--flush_threshold_mb=10000
--cfile_do_on_finish=flush
--flush_threshold_mb=10000
Note that, even if the server hosts many tablets or has less memory than the one used in this test, flushes will still be triggered if the overall memory consumption of the process crosses the configured soft limit. So, configuring a 10GB threshold does not increase the risk of out-of-memory errors.
@@ -418,6 +418,8 @@ for each column:Since Kudu’s initial release, one of the most commonly requested features
-has been support for the UPSERT
operation. UPSERT
, known in some other
-databases as INSERT ... ON DUPLICATE KEY UPDATE
. This operation has the
-semantics of an INSERT
if no key already exists with the provided primary
+has been support for the UPSERT
operation. UPSERT
, known in some other
+databases as INSERT ... ON DUPLICATE KEY UPDATE
. This operation has the
+semantics of an INSERT
if no key already exists with the provided primary
key. Otherwise, it replaces the existing row with the new values.
This week, several developers collaborated to add support for this operation. @@ -204,6 +204,8 @@ Cloudera User Group.
With the C++ client, creating a new table with hash partitions is as simple as
-calling KuduTableCreator:add_hash_partitions
with the columns to hash and the
+calling KuduTableCreator:add_hash_partitions
with the columns to hash and the
number of buckets to use:
unique_ptr<KuduTableCreator> table_creator(my_client->NewTableCreator());
@@ -182,14 +182,14 @@ number of buckets to use:
myClient.createTable("my-table", my_schema, options);
In the examples above, if the hash partition configuration is omitted the create
-table operation will fail with the error Table partitioning must be specified
+table operation will fail with the error
Table partitioning must be specified
using setRangePartitionColumns or addHashPartitions
. In the Java client this
-manifests as a thrown IllegalArgumentException
, while in the C++ client it is
-returned as a Status::InvalidArgument
.
IllegalArgumentException
, while in the C++ client it is
+returned as a Status::InvalidArgument
.
When creating Kudu tables with Impala, the formerly optional DISTRIBUTE BY
+
When creating Kudu tables with Impala, the formerly optional DISTRIBUTE BY
clause is now required:
CREATE TABLE my_table (key_column_a STRING, key_column_b STRING, other_column STRING)
@@ -211,6 +211,8 @@ clause is now required:
Recent posts
+ - Apache Kudu 1.8.0 Released
+
- Index Skip Scan Optimization in Kudu
- Simplified Data Pipelines with Kudu
@@ -239,8 +241,6 @@ clause is now required:
- Apache Kudu Weekly Update November 15th, 2016
- - Apache Kudu Weekly Update November 1st, 2016
-
The Consensus API has the following main responsibilities:
LEADER
and replicate writes to a local
+ LEADER
and replicate writes to a local
write-ahead log (WAL) as well as followers in the Raft configuration. For
each operation written to the leader, a Raft implementation must keep track
of how many nodes have written a copy of the operation being replicated, and
whether or not that constitutes a majority. Once a majority of the nodes
have written a copy of the data, it is considered committed.FOLLOWER
by accepting writes from the leader and
+ FOLLOWER
by accepting writes from the leader and
preparing them to be eventually committed.In the context of making multi-master reliable in 1.0, Adar Dembo posted a design document
on how to handle permanent master failures. Currently the master’s code is missing some features
-like remote bootstrap
which makes it possible for a new replica to download a snapshot of the data
+like remote bootstrap
which makes it possible for a new replica to download a snapshot of the data
from the leader replica.
To use replicated masters, a Kudu operator must deploy some number of Kudu
masters, providing the hostname and port number of each master in the group via
-the --master_address
command line option. For example, each master in a
+the --master_address
command line option. For example, each master in a
three-node deployment should be started with
---master_address=<host1:port1>,<host2:port2><host3:port3>
. In Raft parlance,
+--master_address=<host1:port1>,<host2:port2><host3:port3>
. In Raft parlance,
this group of masters is known as a Raft configuration.
At startup, a Raft configuration of masters will hold a leader election and
@@ -191,7 +191,7 @@ clients are also configured with the locations of all masters. Unlike tablet
servers, they always communicate with the leader master as follower masters will
reject client requests. To do this, clients must determine which master is the
leader before sending the first request as well as whenever any request fails
-with a NOT_THE_LEADER
error.
NOT_THE_LEADER
error.
Dan Burkert contributed a few patches that repackage the Java client under org.apache.kudu
-in place of org.kududb
. This was done in a backward-incompatible way, meaning that import
+
Dan Burkert contributed a few patches that repackage the Java client under org.apache.kudu
+in place of org.kududb
. This was done in a backward-incompatible way, meaning that import
statements will have to be modified in existing Java code to compile against a newer Kudu JAR
version (from 0.10.0 onward). This stems from a discussion
initiated in May. It won’t have an impact on C++ or Python users, and it isn’t affecting wire
@@ -147,12 +147,12 @@ compatibility.
Still on the Java-side, J-D Cryans pushed a patch
that completely changes how Exceptions are managed. Before this change, users had to introspect
generic Exception objects, making it a guessing game and discouraging good error handling.
-Now, the synchronous client’s methods throw KuduException
which packages a Status
object
+Now, the synchronous client’s methods throw KuduException
which packages a Status
object
that can be interrogated. This is very similar to how the C++ API works.
Existing code that uses the new Kudu JAR should still compile since this change replaces generic
-Exception
with a more specific KuduException
. Error handling done by string-matching the
-exception messages should now use the provided Status
object.
Exception
with a more specific KuduException
. Error handling done by string-matching the
+exception messages should now use the provided Status
object.
Alexey Serbin’s patch that adds Doxygen-based @@ -160,11 +160,11 @@ documentation was pushed and the new API documentation for C++ developers will b with the next release.
Todd has made many improvements to the ksck
tool over the last week. Building upon Will
-Berkeley’s WIP patch for KUDU-1516, ksck
can
+
Todd has made many improvements to the ksck
tool over the last week. Building upon Will
+Berkeley’s WIP patch for KUDU-1516, ksck
can
now detect more problematic situations like if a tablet doesn’t have a majority of replicas on
live tablet servers, or if those replicas aren’t in a good state.
-ksck
is also now faster when run against a large
+ksck
is also now faster when run against a large
cluster with a lot of tablets, among other improvements.
Since Kudu’s initial release, it has included separate binaries for different
-administrative or operational tools (e.g. kudu-ts-cli
, kudu-ksck
, kudu-fs_dump
,
-log-dump
, etc). Despite having similar usage, these tools don’t share much code,
+administrative or operational tools (e.g. kudu-ts-cli
, kudu-ksck
, kudu-fs_dump
,
+log-dump
, etc). Despite having similar usage, these tools don’t share much code,
and the separate statically linked binaries make the Kudu packages take more disk
space than strictly necessary.
Adar’s work has introduced a new top-level kudu
binary which exposes a set of subcommands,
-much like the git
and docker
binaries with which readers may be familiar.
+
Adar’s work has introduced a new top-level kudu
binary which exposes a set of subcommands,
+much like the git
and docker
binaries with which readers may be familiar.
For example, a new tool he has built for dumping peer identifiers from a tablet’s
-consensus metadata is triggered using kudu tablet cmeta print_replica_uuids
.
kudu tablet cmeta print_replica_uuids
.
This new tool will be available in the upcoming 0.10.0 release; however, migration
of the existing tools to the new infrastructure has not yet been completed. We
expect that by Kudu 1.0, the old tools will be removed in favor of more subcommands
-of the kudu
tool.
kudu
tool.
Todd Lipcon picked up the work started by David Alves in July to provide @@ -205,7 +205,7 @@ was committed this week. The docs will be published as part of the 0.10.0 release.
Alexey also continued work on implementing the AUTO_FLUSH_BACKGROUND
write
+
Alexey also continued work on implementing the AUTO_FLUSH_BACKGROUND
write
mode for the C++ client. This feature makes it easier to implement high-throughput
ingest using the C++ API by automatically handling the batching and flushing of writes
based on a configurable buffer size.
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
@@ -243,25 +243,25 @@ agent1.sinks.sink1.tableName = stats
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.batchSize = 50
agent1.sinks.sink1.producer = org.apache.kudu.flume.sink.SimpleKuduEventProducer
-
+
-We define a source called source1
which simply executes a vmstat
command to continuously generate
-virtual memory statistics for the machine and queue events into an in-memory channel1
channel,
-which in turn is used for writing these events to a Kudu table called stats
. We are using
-org.apache.kudu.flume.sink.SimpleKuduEventProducer
as the producer. SimpleKuduEventProducer
is
+
We define a source called source1
which simply executes a vmstat
command to continuously generate
+virtual memory statistics for the machine and queue events into an in-memory channel1
channel,
+which in turn is used for writing these events to a Kudu table called stats
. We are using
+org.apache.kudu.flume.sink.SimpleKuduEventProducer
as the producer. SimpleKuduEventProducer
is
the built-in and default producer, but it’s implemented as a showcase for how to write Flume
events into Kudu tables. For any serious functionality we’d have to write a custom producer. We
-need to make this producer and the KuduSink
class available to Flume. We can do that by simply
-copying the kudu-flume-sink-<VERSION>.jar
jar file from the Kudu distribution to the
-$FLUME_HOME/plugins.d/kudu-sink/lib
directory in the Flume installation. The jar file contains
-KuduSink
and all of its dependencies (including Kudu java client classes).
KuduSink
class available to Flume. We can do that by simply
+copying the kudu-flume-sink-<VERSION>.jar
jar file from the Kudu distribution to the
+$FLUME_HOME/plugins.d/kudu-sink/lib
directory in the Flume installation. The jar file contains
+KuduSink
and all of its dependencies (including Kudu java client classes).
At a minimum, the Kudu Flume Sink needs to know where the Kudu masters are
-(agent1.sinks.sink1.masterAddresses = localhost
) and which Kudu table should be used for writing
-Flume events to (agent1.sinks.sink1.tableName = stats
). The Kudu Flume Sink doesn’t create this
+(agent1.sinks.sink1.masterAddresses = localhost
) and which Kudu table should be used for writing
+Flume events to (agent1.sinks.sink1.tableName = stats
). The Kudu Flume Sink doesn’t create this
table, it has to be created before the Kudu Flume Sink is started.
You may also notice the batchSize
parameter. Batch size is used for batching up to that many
+
You may also notice the batchSize
parameter. Batch size is used for batching up to that many
Flume events and flushing the entire batch in one shot. Tuning batchSize properly can have a huge
impact on ingest performance of the Kudu cluster.
Let’s take a look at the source code for the built-in producer class:
-SimpleKuduEventProducer
implements the org.apache.kudu.flume.sink.KuduEventProducer
interface,
+
public class SimpleKuduEventProducer implements KuduEventProducer {
+ private byte[] payload;
+ private KuduTable table;
+ private String payloadColumn;
+
+ public SimpleKuduEventProducer(){
+ }
+
+ @Override
+ public void configure(Context context) {
+ payloadColumn = context.getString("payloadColumn","payload");
+ }
+
+ @Override
+ public void configure(ComponentConfiguration conf) {
+ }
+
+ @Override
+ public void initialize(Event event, KuduTable table) {
+ this.payload = event.getBody();
+ this.table = table;
+ }
+
+ @Override
+ public List<Operation> getOperations() throws FlumeException {
+ try {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addBinary(payloadColumn, payload);
+
+ return Collections.singletonList((Operation) insert);
+ } catch (Exception e){
+ throw new FlumeException("Failed to create Kudu Insert object!", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
+
+
+SimpleKuduEventProducer
implements the org.apache.kudu.flume.sink.KuduEventProducer
interface,
which itself looks like this:
public interface KuduEventProducer extends Configurable, ConfigurableComponent {
+ /**
* Initialize the event producer.
* @param event to be written to Kudu
* @param table the KuduTable object used for creating Kudu Operation objects
- */
- void initialize(Event event, KuduTable table);
+ */
+ void initialize(Event event, KuduTable table);
- /**
+ /**
* Get the operations that should be written out to Kudu as a result of this
* event. This list is written to Kudu using the Kudu client API.
* @return List of {@link org.kududb.client.Operation} which
* are written as such to Kudu
- */
- List<Operation> getOperations();
+ */
+ List<Operation> getOperations();
- /*
+ /*
* Clean up any state. This will be called when the sink is being stopped.
- */
- void close();
-}
-public void configure(Context context)
is called when an instance of our producer is instantiated
+
public void configure(Context context)
is called when an instance of our producer is instantiated
by the KuduSink. SimpleKuduEventProducer’s implementation looks for a producer parameter named
-payloadColumn
and uses its value (“payload” if not overridden in Flume configuration file) as the
+payloadColumn
and uses its value (“payload” if not overridden in Flume configuration file) as the
column which will hold the value of the Flume event payload. If you recall from above, we had
-configured the KuduSink to listen for events generated from the vmstat
command. Each output row
-from that command will be stored as a new row containing a payload
column in the stats
table.
-SimpleKuduEventProducer
does not have any configuration parameters, but if it had any we would
-define them by prefixing it with producer.
(agent1.sinks.sink1.producer.parameter1
for
+configured the KuduSink to listen for events generated from the vmstat
command. Each output row
+from that command will be stored as a new row containing a payload
column in the stats
table.
+SimpleKuduEventProducer
does not have any configuration parameters, but if it had any we would
+define them by prefixing it with producer.
(agent1.sinks.sink1.producer.parameter1
for
example).
The main producer logic resides in the public List<Operation> getOperations()
method. In
+
The main producer logic resides in the public List<Operation> getOperations()
method. In
SimpleKuduEventProducer’s implementation we simply insert the binary body of the Flume event into
-the Kudu table. Here we call Kudu’s newInsert()
to initiate an insert, but could have used
-Upsert
if updating an existing row was also an option, in fact there’s another producer
-implementation available for doing just that: SimpleKeyedKuduEventProducer
. Most probably you
+the Kudu table. Here we call Kudu’s newInsert()
to initiate an insert, but could have used
+Upsert
if updating an existing row was also an option, in fact there’s another producer
+implementation available for doing just that: SimpleKeyedKuduEventProducer
. Most probably you
will need to write your own custom producer in the real world, but you can base your implementation
on the built-in ones.