Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A8E57200D01 for ; Fri, 22 Sep 2017 23:33:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A7B221609E5; Fri, 22 Sep 2017 21:33:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9FFCD1609BE for ; Fri, 22 Sep 2017 23:33:25 +0200 (CEST) Received: (qmail 81225 invoked by uid 500); 22 Sep 2017 21:33:24 -0000 Mailing-List: contact user-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@kudu.apache.org Delivered-To: mailing list user@kudu.apache.org Received: (qmail 81214 invoked by uid 99); 22 Sep 2017 21:33:24 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Sep 2017 21:33:24 +0000 Received: from mail-io0-f173.google.com (mail-io0-f173.google.com [209.85.223.173]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id D67881A00FB for ; Fri, 22 Sep 2017 21:33:23 +0000 (UTC) Received: by mail-io0-f173.google.com with SMTP id l15so5660901iol.8 for ; Fri, 22 Sep 2017 14:33:22 -0700 (PDT) X-Gm-Message-State: AHPjjUgkNejFqmpvT0T384DTw5bmRUsNdOjW+o9e4IwylNF4TatE3BtU 6wyTS9R71eWZQcNNV4cybabXFMuBNmsFNkyBJIU= X-Google-Smtp-Source: AOwi7QAJARXA/EMg6ESr4sMOFdrHVpDWbHqnQmPNTBVwmfQtX5pD7/35sOD0VnQbXW3C9leuWAn0TPWVTi0MCbkQoRM= X-Received: by 10.107.135.147 with SMTP id r19mr905094ioi.26.1506116002149; Fri, 22 Sep 2017 14:33:22 -0700 (PDT) MIME-Version: 1.0 Received: by 10.2.95.133 with HTTP; Fri, 22 Sep 2017 14:32:41 -0700 (PDT) In-Reply-To: References: <684978113.13652286.1506046223155.JavaMail.zimbra@comcast.net> <112486188.13652985.1506046321744.JavaMail.zimbra@comcast.net> From: Mike Percy Date: Fri, 22 Sep 2017 14:32:41 -0700 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Change Data Capture (CDC) with Kudu To: user@kudu.apache.org Content-Type: multipart/alternative; boundary="001a113ec75a4c7dd50559cdf53f" archived-at: Fri, 22 Sep 2017 21:33:26 -0000 --001a113ec75a4c7dd50559cdf53f Content-Type: text/plain; charset="UTF-8" Franco, I just realized that I suggested something you mentioned in your initial email. My mistake for not reading through to the end. It is probably the least-worst approach right now and it's probably what I would do if I were you. Mike On Fri, Sep 22, 2017 at 2:29 PM, Mike Percy wrote: > CDC is something that I would like to see in Kudu but we aren't there yet > with the underlying support in the Raft Consensus implementation. Once we > have higher availability re-replication support (KUDU-1097) we will be a > bit closer for a solution involving traditional WAL streaming to an > external consumer because we will have support for non-voting replicas. But > there would still be plenty of work to do to support CDC after that, at > least from an API perspective as well as a WAL management perspective (how > long to keep old log files). > > That said, what you really are asking for is a streaming backup solution, > which may or may not use the same mechanism (unfortunately it's not > designed or implemented yet). > > As an alternative to Adar's suggestions, a reasonable option for you at > this time may be an incremental backup. It takes a little schema design to > do it, though. You could consider doing something like the following: > > 1. Add a last_updated column to all your tables and update the column > when you change the value. Ideally monotonic across the cluster but you > could also go with local time and build in a "fudge factor" when reading in > step 2 > 2. Periodically scan the table for any changes newer than the previous > scan in the last_updated column. This type of scan is more efficient to do > in Kudu than in many other systems. With Impala you could run a query like: > select * from table1 where last_updated > $prev_updated; > 3. Dump the results of this query to parquet > 4. Use distcp to copy the parquet files over to the other cluster > periodically (maybe you can throttle this if needed to avoid saturating the > pipe) > 5. Upsert the parquet data into Kudu on the remote end > > Hopefully some workaround like this would work for you until Kudu has a > reliable streaming backup solution. > > Like Adar said, as an Apache project we are always open to contributions > and it would be great to get some in this area. Please reach out if you're > interested in collaborating on a design. > > Mike > > On Fri, Sep 22, 2017 at 10:43 AM, Adar Lieber-Dembo > wrote: > >> Franco, >> >> Thanks for the detailed description of your problem. >> >> I'm afraid there's no such mechanism in Kudu today. Mining the WALs seems >> like a path fraught with land mines. Kudu GCs WAL segments aggressively so >> I'd be worried about a listening mechanism missing out on some row >> operations. Plus the WAL is Raft-specific as it includes both REPLICATE >> messages (reflecting a Write RPC from a client) and COMMIT messages >> (written out when a majority of replicas have written a REPLICATE); parsing >> and making sense of this would be challenging. Perhaps you could build >> something using Linux's inotify system for receiving file change >> notifications, but again I'd be worried about missing certain updates. >> >> Another option is to replicate the data at the OS level. For example, you >> could periodically rsync the entire cluster onto a standby cluster. There's >> bound to be data loss in the event of a failover, but I don't think you'll >> run into any corruption (though Kudu does take advantage of sparse files >> and hole punching, so you should verify that any tool you use supports >> that). >> >> Disaster Recovery is an oft-requested feature, but one that Kudu >> developers have been unable to prioritize yet. Would you or your someone on >> your team be interested in working on this? >> >> On Thu, Sep 21, 2017 at 7:12 PM Franco Venturi >> wrote: >> >>> We are planning for a 50-100TB Kudu installation (about 200 tables or >>> so). >>> >>> One of the requirements that we are working on is to have a secondary >>> copy of our data in a Disaster Recovery data center in a different location. >>> >>> >>> Since we are going to have inserts, updates, and deletes (for instance >>> in the case the primary key is changed), we are trying to devise a process >>> that will keep the secondary instance in sync with the primary one. The two >>> instances do not have to be identical in real-time (i.e. we are not looking >>> for synchronous writes to Kudu), but we would like to have some pretty good >>> confidence that the secondary instance contains all the changes that the >>> primary has up to say an hour before (or something like that). >>> >>> >>> So far we considered a couple of options: >>> - refreshing the seconday instance with a full copy of the primary one >>> every so often, but that would mean having to transfer say 50TB of data >>> between the two locations every time, and our network bandwidth constraints >>> would prevent to do that even on a daily basis >>> - having a column that contains the most recent time a row was updated, >>> however this column couldn't be part of the primary key (because the >>> primary key in Kudu is immutable), and therefore finding which rows have >>> been changed every time would require a full scan of the table to be >>> sync'd. It would also rely on the "last update timestamp" column to be >>> always updated by the application (an assumption that we would like to >>> avoid), and would need some other process to take into accounts the rows >>> that are deleted. >>> >>> >>> Since many of today's RDBMS (Oracle, MySQL, etc) allow for some sort of >>> 'Change Data Capture' mechanism where only the 'deltas' are captured and >>> applied to the secondary instance, we were wondering if there's any way in >>> Kudu to achieve something like that (possibly mining the WALs, since my >>> understanding is that each change gets applied to the WALs first). >>> >>> >>> Thanks, >>> Franco Venturi >>> >> > --001a113ec75a4c7dd50559cdf53f Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Franco,
I just realized that I suggested something you= mentioned in your initial email. My mistake for not reading through to the= end. It is probably the least-worst approach right now and it's probab= ly what I would do if I were you.

Mike
=

On Fri, Sep 22, 2= 017 at 2:29 PM, Mike Percy <mpercy@apache.org> wrote:
CDC is something that I would = like to see in Kudu but we aren't there yet with the underlying support= in the Raft Consensus implementation. Once we have higher availability re-= replication support (KUDU-1097) we will be a bit closer for a solution invo= lving traditional WAL streaming to an external consumer because we will hav= e support for non-voting replicas. But there would still be plenty of work = to do to support CDC after that, at least from an API perspective as well a= s a WAL management perspective (how long to keep old log files).

That said, what you really are asking for is a streaming backup so= lution, which may or may not use the same mechanism (unfortunately it's= not designed or implemented yet).

As an alternati= ve to Adar's suggestions, a reasonable option for you at this time may = be an incremental backup. It takes a little schema design to do it, though.= You could consider doing something like the following:
  1. A= dd a last_updated column to all your tables and update the column when you = change the value. Ideally monotonic across the cluster but you could also g= o with local time and build in a "fudge factor" when reading in s= tep 2
  2. Periodically scan the table for any changes newer than the pr= evious scan in the last_updated column. This type of scan is more efficient= to do in Kudu than in many other systems. With Impala you could run a quer= y like: select * from table1 where last_updated > $prev_updated;
  3. Dump the results of this query to parquet
  4. Use distcp to copy the p= arquet files over to the other cluster periodically (maybe you can throttle= this if needed to avoid saturating the pipe)
  5. Upsert the parquet da= ta into Kudu on the remote end
Hopefully some workaround like= this would work for you until Kudu has a reliable streaming backup solutio= n.

Like Adar said, as an Apache project we a= re always open to contributions and it would be great to get some in this a= rea. Please reach out if you're interested in collaborating on a design= .

M= ike

On Fri, Sep 22, 2017 at= 10:43 AM, Adar Lieber-Dembo <adar@cloudera.com> wrote:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex">
Franco,

Thanks for the detailed description o= f your problem.

I'm = afraid there's no such mechanism in Kudu today. Mining the WALs seems l= ike a path fraught with land mines. Kudu GCs WAL segments aggressively so I= 'd be worried about a listening mechanism missing out on some row opera= tions. Plus the WAL is Raft-specific as it includes both REPLICATE messages= (reflecting a Write RPC from a client) and COMMIT messages (written out wh= en a majority of replicas have written a REPLICATE); parsing and making sen= se of this would be challenging. Perhaps you could build something using Li= nux's inotify system for receiving file change notifications, but again= I'd be worried about missing certain updates.
<= br>
Another option is to replicate the data at the O= S level. For example, you could periodically rsync the entire cluster onto = a standby cluster. There's bound to be data loss in the event of a fail= over, but I don't think you'll run into any corruption (though Kudu= does take advantage of sparse files and hole punching, so you should verif= y that any tool you use supports that).

Disaster Recovery is an oft-requested feature, but one that= Kudu developers have been unable to prioritize yet. Would you or your some= one on your team be interested in working on this?

On Thu, Sep 21= , 2017 at 7:12 PM Franco Venturi <fventuri@comcast.net> wrote:

We are planning for a 50-100TB Ku= du installation (about 200 tables or so).

One of= the requirements that we are working on is to have a secondary copy of our= data in a Disaster Recovery data center in a different location.


Since we are going to hav= e inserts, updates, and deletes (for instance in the case the primary key i= s changed), we are trying to devise a process that will keep the secondary = instance in sync with the primary one. The two instances do not have to be = identical in real-time (i.e. we are not looking for synchronous writes to K= udu), but we would like to have some pretty good confidence that the second= ary instance contains all the changes that the primary has up to say an hou= r before (or something like that).


So far we considered a couple of options:
- refreshin= g the seconday instance with a full copy of the primary one every so often,= but that would mean having to transfer say 50TB of data between the two lo= cations every time, and our network bandwidth constraints would prevent to = do that even on a daily basis
- having a column that contains the most r= ecent time a row was updated, however this column couldn't be part of t= he primary key (because the primary key in Kudu is immutable), and therefor= e finding which rows have been changed every time would require a full scan= of the table to be sync'd. It would also rely on the "last update= timestamp" column to be always updated by the application (an assumpt= ion that we would like to avoid), and would need some other process to take= into accounts the rows that are deleted.


Since many of today's RDBMS (Oracle, MySQL, e= tc) allow for some sort of 'Change Data Capture' mechanism where on= ly the 'deltas' are captured and applied to the secondary instance,= we were wondering if there's any way in Kudu to achieve something like= that (possibly mining the WALs, since my understanding is that each change= gets applied to the WALs first).


Thanks,
Franco Venturi



--001a113ec75a4c7dd50559cdf53f--