kudu-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Franco Venturi <fvent...@comcast.net>
Subject Re: Change Data Capture (CDC) with Kudu
Date Sat, 23 Sep 2017 18:33:20 GMT

Adar and Mike, 
first of all thanks for the replies and interesting suggestions. 

Over the last couple of days I came up with a different approach that wouldn't require to
mess with the Kudu WALs (or much of the internals). 

The basic idea would be to capture the changes (insert, deletes, schema changes, etc) at the
input, i.e. when they enter Kudu, instead of when they get written to the actual storage (i.e.
WALs, data files). 
Let me explain better: since all the interactions between the Kudu services and their clients
is via a set of well defined RPCs (see here https://github.com/cloudera/kudu/blob/master/docs/design-docs/rpc.md),
my idea would be to interpose a thin transparent proxy layer between the clients and the Kudu
services that basically sends everything though unchanged, but also captures the "interesting"
RPCs and writes them aside to be replayed on the secondary instance. 

This approach would have several advantages including capturing changes at a much higher 'logical'
level (while mining the WALs would probably result in a more 'physical' view of the changes).

A few other advantages I thought of would be: 
- the RPC layer should probably be a much more stable interface than the internals of the
WALs (since the clients interact with it and hence changing the RPC interfaces would cause
a lot more problems than changing some of the Kudu internals) 
- the RPC layer has already a well defined serialization layout via Protobuf messages, and
therefore it would very simple to just 'dump' to disk the full content of interesting RPC
- the transparent RPC proxy could be written as a standalone independent process that listens
to the Kudu ports (7050, 7051) and sends everything back to the 'real' Kudu ports (which would
have to be changed to say 17050 and 17051) 
- being a totally independent process it would not require any changes to the Kudu code base
(or even the Kudu binary installation in case of a distribution), and therefore could be transparently
"added" to an existing Kudu installation (if you need vendor support for a Kudu problem, I
imagine you would have to first make sure you can replicate the issue without this proxy)

- as a standalone application, it could also be marketed as a premium 'add-on' feature by
a vendor, since most of the customers asking for this kind of replication are at the enterprise
level, who would defintely see the added value offered by this tool 
- since the list of changes is sequential in nature (it could be ordered by 'call_id' value),
it could be appended to some sort of "transaction" log file (which could be stored in HDFS,
since these would be sequential writes); the file would be periodically sync'd up to the secondary
instance (for instance via 'rsync' or by taking advantage of BDR if the file is stored in
- on the secondary site another process (acting as a Kudu client) would replay the RPC calls
to the secondary Kudu instance following the 'call_id' sequence; given the strict integration
between the 'replayer' process and the 'transparent RPC proxy' on the primary site, they could
both be part of the same application binary (which could possibly be used in a bidirectional
way for an active-active scenario, where an organization may want to have some sort of multi-master
- the secondary 'replayer' process too wouldn't require any changes to the existing Kudu installation
as it would act just as an external RPC client from the Kudu point of view 

Of course the devil is in the details, and a few issues I can see are: 
- how to deal with errors on either side (for instance disk full on just one of the two instances);
the number of error combinations could be potentially quite large 
- keeping track of the tablet id's on both sides (I imagine they would be different, so the
'replayer' process would have to 'map' the original tablet id's - and probably several other
values - to the matching values on the secondary instance) 
- integration with Kerberos and security issues in general 

This morning I spent a few minutes looking at the list of Kudu RPCs (I pulled the source code
from git and ran this command: find . -name \*.proto | grep -v 'rtest.proto' | xargs cat |
grep '^ rpc ' | sort) and this is an initial list of the RPC methods that I think would have
to captured: 
- AlterSchema 
- AlterTable 
- CreateTable 
- CreateTablet 
- DeleteTable 
- DeleteTablet 
- Write 

I think the first step would be to just write a fully transparent RPC proxy for Kudu that
just logs each RPC method through it, to see how it goes. 


----- Original Message -----

From: "Mike Percy" <mpercy@apache.org> 
To: user@kudu.apache.org 
Sent: Friday, September 22, 2017 5:32:41 PM 
Subject: Re: Change Data Capture (CDC) with Kudu 

I just realized that I suggested something you mentioned in your initial email. My mistake
for not reading through to the end. It is probably the least-worst approach right now and
it's probably what I would do if I were you. 


On Fri, Sep 22, 2017 at 2:29 PM, Mike Percy < mpercy@apache.org > wrote: 

CDC is something that I would like to see in Kudu but we aren't there yet with the underlying
support in the Raft Consensus implementation. Once we have higher availability re-replication
support (KUDU-1097) we will be a bit closer for a solution involving traditional WAL streaming
to an external consumer because we will have support for non-voting replicas. But there would
still be plenty of work to do to support CDC after that, at least from an API perspective
as well as a WAL management perspective (how long to keep old log files). 

That said, what you really are asking for is a streaming backup solution, which may or may
not use the same mechanism (unfortunately it's not designed or implemented yet). 

As an alternative to Adar's suggestions, a reasonable option for you at this time may be an
incremental backup. It takes a little schema design to do it, though. You could consider doing
something like the following: 

    1. Add a last_updated column to all your tables and update the column when you change
the value. Ideally monotonic across the cluster but you could also go with local time and
build in a "fudge factor" when reading in step 2 
    2. Periodically scan the table for any changes newer than the previous scan in the last_updated
column. This type of scan is more efficient to do in Kudu than in many other systems. With
Impala you could run a query like: select * from table1 where last_updated > $prev_updated;

    3. Dump the results of this query to parquet 
    4. Use distcp to copy the parquet files over to the other cluster periodically (maybe
you can throttle this if needed to avoid saturating the pipe) 
    5. Upsert the parquet data into Kudu on the remote end 

Hopefully some workaround like this would work for you until Kudu has a reliable streaming
backup solution. 

Like Adar said, as an Apache project we are always open to contributions and it would be great
to get some in this area. Please reach out if you're interested in collaborating on a design.


On Fri, Sep 22, 2017 at 10:43 AM, Adar Lieber-Dembo < adar@cloudera.com > wrote: 



Thanks for the detailed description of your problem. 

I'm afraid there's no such mechanism in Kudu today. Mining the WALs seems like a path fraught
with land mines. Kudu GCs WAL segments aggressively so I'd be worried about a listening mechanism
missing out on some row operations. Plus the WAL is Raft-specific as it includes both REPLICATE
messages (reflecting a Write RPC from a client) and COMMIT messages (written out when a majority
of replicas have written a REPLICATE); parsing and making sense of this would be challenging.
Perhaps you could build something using Linux's inotify system for receiving file change notifications,
but again I'd be worried about missing certain updates. 

Another option is to replicate the data at the OS level. For example, you could periodically
rsync the entire cluster onto a standby cluster. There's bound to be data loss in the event
of a failover, but I don't think you'll run into any corruption (though Kudu does take advantage
of sparse files and hole punching, so you should verify that any tool you use supports that).

Disaster Recovery is an oft-requested feature, but one that Kudu developers have been unable
to prioritize yet. Would you or your someone on your team be interested in working on this?

On Thu, Sep 21, 2017 at 7:12 PM Franco Venturi < fventuri@comcast.net > wrote: 


We are planning for a 50-100TB Kudu installation (about 200 tables or so). 

One of the requirements that we are working on is to have a secondary copy of our data in
a Disaster Recovery data center in a different location. 

Since we are going to have inserts, updates, and deletes (for instance in the case the primary
key is changed), we are trying to devise a process that will keep the secondary instance in
sync with the primary one. The two instances do not have to be identical in real-time (i.e.
we are not looking for synchronous writes to Kudu), but we would like to have some pretty
good confidence that the secondary instance contains all the changes that the primary has
up to say an hour before (or something like that). 

So far we considered a couple of options: 
- refreshing the seconday instance with a full copy of the primary one every so often, but
that would mean having to transfer say 50TB of data between the two locations every time,
and our network bandwidth constraints would prevent to do that even on a daily basis 
- having a column that contains the most recent time a row was updated, however this column
couldn't be part of the primary key (because the primary key in Kudu is immutable), and therefore
finding which rows have been changed every time would require a full scan of the table to
be sync'd. It would also rely on the "last update timestamp" column to be always updated by
the application (an assumption that we would like to avoid), and would need some other process
to take into accounts the rows that are deleted. 

Since many of today's RDBMS (Oracle, MySQL, etc) allow for some sort of 'Change Data Capture'
mechanism where only the 'deltas' are captured and applied to the secondary instance, we were
wondering if there's any way in Kudu to achieve something like that (possibly mining the WALs,
since my understanding is that each change gets applied to the WALs first). 

Franco Venturi 



View raw message