From dev-return-106012-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jul 24 12:42:36 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9544218025F for ; Wed, 24 Jul 2019 14:42:36 +0200 (CEST) Received: (qmail 65179 invoked by uid 500); 24 Jul 2019 12:42:34 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 65163 invoked by uid 99); 24 Jul 2019 12:42:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jul 2019 12:42:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 55FA718057E for ; Wed, 24 Jul 2019 12:42:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.801 X-Spam-Level: * X-Spam-Status: No, score=1.801 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id KN7kzG-_KvSA for ; Wed, 24 Jul 2019 12:42:28 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::82a; helo=mail-qt1-x82a.google.com; envelope-from=adam.bellemare@gmail.com; receiver= Received: from mail-qt1-x82a.google.com (mail-qt1-x82a.google.com [IPv6:2607:f8b0:4864:20::82a]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 1B7097E214 for ; Wed, 24 Jul 2019 12:42:28 +0000 (UTC) Received: by mail-qt1-x82a.google.com with SMTP id w17so959660qto.10 for ; Wed, 24 Jul 2019 05:42:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=2Ud2V+8cnj2rn7DJaCl24E1HbIFqJNVdXQmOv7UEcVY=; b=gPnA5tLV6GnNkvk9l4Xqhy9qnF0A/gIlCBP4sSgr9xc8e5oAAceF/pApAcIAVPjMgC +vTU9V9EO5lI1i55lWQgC24IL2EHvznNZ9ov6uX9QuAqKgrHB0xDd3HsNY76nroiCrP2 AD3tTsdp1DKlsxVcXFdqv3tDMXJhYn0c6up9ej6jVrsxGX+obZhnOwXyWFnO9BDUHh3M jynMDRfJYOmQIrZDrzVlsPKVJGbOirREZZi0EcRqtAp49YuNN9TsyNdJArl6vU9WSqjf 50G13wgg+4tbHVn+1t+ypnxRaDor2/dC7mTkPn5ais91J/yN0EM1isNp/LKf1fNxqRxe 82fg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=2Ud2V+8cnj2rn7DJaCl24E1HbIFqJNVdXQmOv7UEcVY=; b=GHT/SZzJWQYL04wF9yqjYtwRKe8I/hkXpVmeYJUKkYA3IIUyMgT0tjh8tUKwzwkykX Hva2FmalbYsQfxSKuD3YrwjxHo2ceqRQOlgCNQUErteJwGDxXlRXkxjcFbcfIb0pwmlb uccuNJ7j2E6lP8rtM+kDk3QyJUu13EwJ7QtpithYJJdcVZY8Uip4biIQJstE0jrfZnhn aQBoHiF+foJbkgwjLgDgOXiYaYfpsNK3xbxc5LOsFrRTH1Q8IeViyN6Xa18p2shczgIt 75H0HH3LMbHsAaam1+jRL4453tN/MnREuQzQSUfxmC2nymquJDz3CTVKEgytU7SMEMmN +RUg== X-Gm-Message-State: APjAAAVJrHLnommmw4mizlxrTDIci5DNAwhmhRlonr4vewB7ihBQmICE H6OAEyAK+4Ohi80E3yRACervsBBU2xVd7HsxLRE= X-Google-Smtp-Source: APXvYqyMoQcmVzQgCu4LL7sHDH2q537Pbv2LKS4myn9gRuhJNuT3Fw9i2Y6pwOFp2AP0oh6+R8A6Xs6s7hYq3DJyoF0= X-Received: by 2002:ac8:1106:: with SMTP id c6mr4894637qtj.332.1563972146910; Wed, 24 Jul 2019 05:42:26 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Adam Bellemare Date: Wed, 24 Jul 2019 08:42:15 -0400 Message-ID: Subject: Re: KIP-382 + Kafka Streams Question To: Ryanne Dolan Cc: dev Content-Type: multipart/alternative; boundary="000000000000415b77058e6ca4c5" --000000000000415b77058e6ca4c5 Content-Type: text/plain; charset="UTF-8" Hi Ryanne > Lemme know if I haven't answered this clearly. Nope, this was very helpful. Thank you! > A single "stream" can come from multiple input topics I overlooked that - I was thinking of simply using the StreamBuilder.table() functionality instead, but that function doesn't support a Collection of topics. Since the topics would be copartitioned by definition, wouldn't the event dispatcher in PartitionGroup (priorityQueue and streamtime ordering) ensure that the topics are processed in incrementing streamtime order? Alternately, I suppose this could be a case where it is a good idea to have the timestamp of the event within the event's value payload, such that: StreamBuilder.streams(Set("userEntity", "primary.userEntity")) .groupByKey() .reduce() can allow us to materialize the latest state for a given key. Thanks Ryanne, this has been a very helpful discussion for me. We are prototyping the usage of MM2 internally at the moment in anticipation of its release in 2.4 and want to ensure we have our replication + recovery strategies sorted out. Adam On Tue, Jul 23, 2019 at 7:26 PM Ryanne Dolan wrote: > Adam, I think we are converging :) > > > "userEntity"...where I only want the latest emailAddress (basic > materialization) to send an email on account password update. > > Yes, you want all "userEntity" data on both clusters. Each cluster will > have "userEntity" and the remote counterpart > "secondary/primary.userEntity", as in my example (1). The send-email part > can run on either cluster (but not both, to avoid duplicate emails), > subscribing to both "userEntity" and "secondary/primary.userEntity". For > DR, you can migrate this app between clusters via offset translation and > the kafka-streams-application-reset tool. > > Then, you want a materialize-email-table app running in _both_ clusters, > so that the latest emails are readily available in RocksDB from either > cluster. This also subscribes to both "userEntity" and > "secondary/primary.userEntity" s.t. records originating from either cluster > are processed. > > (Equivalently, send-email and materialize-email-table could be parts of > the same Streams app, just configured differently, e.g. with send-email > short-circuited in all but one cluster.) > > Under normal operation, your userEntity events are sent to the primary > cluster (topic: userEntity), processed there via materialize-email-table > and send-email, and replicated to the secondary cluster (topic: > primary.userEntity) via MM2. When primary goes down, your producers > (whatever is sending userEntity events) can failover to the secondary > cluster (topic: userEntity). This can happen in real-time, i.e. as soon as > the producer detects an outage or via a load balancer with healthchecks > etc. So under normal operation, you have all userEntity events in both > clusters, and both clusters are available for producing to. > > N.B. this is not dual-ingest, which would require you always produce > directly to both clusters. It's active/active, b/c you can produce to > either cluster at any point in time, and the effect is the same. > > > Q1) Where does the producer write its data to if the primary cluster is > dead? > > With active/active like this, you can send to either cluster. > > > Q2) How does a Kafka Streams application materialize state from two > topics? > > A Streams app can subscribe to multiple topics. A single "stream" can come > from multiple input topics (see: > https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.Collection- > ) > > Likewise, a KTable can be materialized from multiple source topics -- in > this case, userEntity, primary.userEntity and/or secondary.userEntity. You > can think of these as parts of a "virtual topic", as you described. > > > (loaded question, I know) > > There is one caveat I can think of: there is no ordering guarantee across > different topics in the same stream, so materialization could be > inconsistent between the two clusters if, say, the same users's email was > changed to different values at the same millisecond in both clusters. This > may or may not be a problem. > > > Q3) ... recommendations on how to handle replication/producing of > entity-data (ie: userEntity) across multiple clusters... > > Lemme know if I haven't answered this clearly. > > Ryanne > > On Tue, Jul 23, 2019 at 1:03 PM Adam Bellemare > wrote: > >> Hi Ryanne >> >> Thanks for the clarifications! Here is one of my own, as I think it's the >> biggest stumbling block in my description: >> >> *> What is "table" exactly? I am interpreting this as a KTable changelog >> topic* >> "table" is not a KTable changelog topic, but simply entity data that is >> to be materialized into a table - for example, relational data captured >> from Kafka Connect. I should have named this "stateful-data" or something >> less ambiguous and provided an explicit definition. Note that non-KStreams >> applications will also regularly use this entity data to materialize their >> own tables, but it in itself is not a KTable internal changelog. >> >> Per your example 1, let's name this topic "userEntity". It could be a >> (key,value) pair of (userId, emailAddress), where I only want the latest >> emailAddress (basic materialization) to send an email on account password >> update. I only want to run the application against one Kafka cluster, and >> because I don't want to use dual-ingest, I am running that application only >> on the cluster where the data is being sent (Primary Cluster). In a >> scenario where all replication is working correctly I could also run this >> off the Secondary cluster's replica, "primary.userEntity" >> >> >> >> *> Yes, that's something like "dual ingest", which I would not recommend.* >> Agreed. I do not want to use dual ingest. >> >> *> Secondary cluster:* >> *> Topics: events, primary.events, table-changelog* >> *> App subscription: events, primary.events* >> *> App output: table-changelog* >> >> Is the "events" topic dual ingest, since it exists in the Primary cluster >> with the exact same name? >> >> The whole scenario can be boiled down into the following: >> 1) Entity data is in a userEntity topic, ie: (userId, emailAddress) >> 2) I want to publish it into an Active-Active cluster setup without using >> dual-ingest >> 3) I want to materialize the data into a single table for an application >> consuming from a single cluster (Kafka Streams or not) >> 4) I want to be able to fail over and rebuild the materialized state >> using the data I have replicated. >> - If all of the entity data is produced to each cluster (dual-ingest) >> than it is trivial to fail over and rebuild the materialized table. >> - If the data is only produced to Primary and only replicated to >> Secondary, at a failover I would need to consume from the replicated topic. >> * Q1) Where does the producer write its data to if the primary cluster >> is dead?* >> It seems to me that it must then write its data to the only >> remaining cluster. This would then put the entity data in two topics as I >> had originally outlined, as below: >> * Secondary Cluster: (Live) (renamed table to userEntity)* >> Topic: "primary.userEntity" (contains data from T = 0 to T = n) >> Topic: "userEntity" (contains data from T = n+1 to now, the >> failed-over producer) >> >> >> * Q2) How does a Kafka Streams application materialize state from two >> topics? (loaded question, I know)* >> Since I know this isn't built in, is there some sort of >> technique or system that you use to allow for a single virtual topic made >> up of many logical topics? >> >> *Q3) Do you have any recommendations on how to handle >> replication/producing of entity-data (ie: userEntity) across multiple >> clusters, such that an application may correctly (or even near-correctly) >> materialize state after a failover like the one I described above?* >> This is really the golden question. We're currently developing our >> Active-Passive approach, but we want to be prepared for scenarios where we >> have multiple clusters with entity-replication between clusters. >> >> >> Thanks Ryanne! >> >> >> On Tue, Jul 23, 2019 at 12:39 PM Ryanne Dolan >> wrote: >> >>> Adam, >>> >>> > I think we have inconsistent definitions of Active-Active >>> >>> Yes, this terminology gets thrown around a lot. IMO "active" means both >>> producers and consumers are using a cluster under normal operation -- not >>> just during outages, and not just by something like MM2. (Obviously, MM2 >>> has producers and consumers, but they don't count here.) Conversely, >>> "standby" or "backup" means that data is being written by a producer, but >>> it isn't being consumed under normal operation. I qualify this definition >>> with IMO, as I don't think there is strong consensus here. >>> >>> I'll also add a caveat about "under normal operation". An active/active >>> architecture does not necessarily mean that you use both clusters in the >>> same way all the time -- only that you _could_. You could load-balance >>> 50/50 of your traffic between two clusters, or you could direct 100% to one >>> and 0% to the other, e.g. if one is farther away or has less hw resources. >>> But the architecture remains the same (and certainly, MM2 doesn't care >>> about this detail). >>> >>> > The producer is only producing to one cluster (primary) and one topic >>> (topic "table"), and the other cluster (secondary) contains only a >>> replication of the data via MM2 ("primary.table"). >>> >>> That, by definition, is not active/active. >>> >>> >What you seemed to be proposing is that the producer's "table" data is >>> sent fully to each cluster, such that the state can be materialized as a >>> KTable in each application running on each cluster. >>> >>> Correct. >>> >>> > This wouldn't require MM2 at all, so I'm not sure if this is what you >>> advocated. >>> >>> You could use a dual-ingest method and send all your data to both >>> clusters, which would not require MM2. There are many issues with this >>> approach, primarily wrt to consistency and efficiency. >>> >>> > The trivial solution seems to be to make your producers produce all >>> stateful data (topic "table") to each cluster, which makes MM2 unnecessary, >>> but can also lead to data inconsistencies so it's not exactly foolproof. >>> >>> Yes, that's something like "dual ingest", which I would not recommend. >>> >>> > SteamsAppPrimary is consuming from ("table") >>> >>> What is "table" exactly? I am interpreting this as a KTable changelog >>> topic, in which case "table" is an output topic of some streams app, i.e. >>> the app producing the change events. _This_ is the app I mean to suggest >>> you run on both clusters. Then, "table" will appear on both clusters (no >>> "primary.table"). >>> >>> The app that is creating the "table" changelog would be processing >>> events from some other topic, say "events". Then, this is what I recommend: >>> >>> Primary cluster: >>> Topics: events, secondary.events, table-changelog >>> App subscription: events, secondary.events >>> App output: table-changelog >>> >>> Secondary cluster: >>> Topics: events, primary.events, table-changelog >>> App subscription: events, primary.events >>> App output: table-changelog >>> >>> With this arrangement, the app on either cluster will have built up >>> state in RocksDB based on events from both clusters. >>> >>> Now, it seems you also want a second app to process this changelog. I >>> can see a few scenarios: >>> >>> 1) you want to take some external action based on records in the table >>> changelog, e.g. to send an email every time a password is updated. In this >>> case, you don't want this app running in both clusters, as you'd get two >>> emails. So you could run it in one cluster and use offset translation to >>> migrate during failover. The send-email app is stateless, so you just need >>> to translate and reset offsets (there is no internal state to rebuild). >>> >>> 2) you want to use the table changelog in a stateful but non-effecting >>> way, e.g. by keeping a running count of records. This app, like the first, >>> can be run in both clusters. >>> >>> 3) you want some combination of state and external actions in one big >>> app. In this case, I'd consider splitting your app in two so that you can >>> built state in both clusters while effecting external actions in only one >>> cluster at a time. >>> >>> Lemme know if that makes sense. >>> >>> Ryanne >>> >>> On Tue, Jul 23, 2019 at 10:19 AM Adam Bellemare < >>> adam.bellemare@gmail.com> wrote: >>> >>>> Hi Ryanne >>>> >>>> I think we have inconsistent definitions of Active-Active. The producer >>>> is only producing to one cluster (primary) and one topic (topic "table"), >>>> and the other cluster (secondary) contains only a replication of the data >>>> via MM2 ("primary.table"). What you seemed to be proposing is that the >>>> producer's "table" data is sent fully to each cluster, such that the state >>>> can be materialized as a KTable in each application running on each >>>> cluster. This wouldn't require MM2 at all, so I'm not sure if this is what >>>> you advocated. >>>> >>>> You also state that "As with normal consumers, the Streams app should *subscribe >>>> to any remote topics*, e.g. with a regex, s.t. the application state >>>> will reflect input from either source cluster.". Wouldn't this mean that >>>> the stateful "table" topic that we wish to materialize would be replicated >>>> by MM2 from Primary, such that we end up with the following: >>>> >>>> *Replicated Entity/Stateful Data:* >>>> *Primary Cluster: (Live)* >>>> Topic: "table" (contains data from T = 0 to T = n) >>>> SteamsAppPrimary is consuming from ("table") >>>> >>>> *Secondary Cluster: (Live)* >>>> Topic: "primary.table" (contains data from T = 0 to T = n) >>>> SteamsAppSecondary is consuming from ("primary.table") >>>> >>>> What does StreamsAppSecondary do when "primary.table" is no longer >>>> replicated because Primary has died? Additionally, where should the >>>> producer of topic "table" now write its data to, assuming that Primary >>>> Cluster is irrevocably lost? >>>> >>>> I hope this better outlines my scenario. The trivial solution seems to >>>> be to make your producers produce all stateful data (topic "table") to each >>>> cluster, which makes MM2 unnecessary, but can also lead to data >>>> inconsistencies so it's not exactly foolproof. >>>> >>>> Thanks >>>> >>>> On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan >>>> wrote: >>>> >>>>> Hello Adam, thanks for the questions. Yes my organization uses >>>>> Streams, and yes you can use Streams with MM2/KIP-382, though perhaps not >>>>> in the way you are describing. >>>>> >>>>> The architecture you mention is more "active/standby" than >>>>> "active/active" IMO. The "secondary" cluster is not being used until a >>>>> failure, at which point you migrate your app and expect the data to already >>>>> be there. This works for normal consumers where you can seek() and >>>>> --reset-offsets. Streams apps can be reset with the >>>>> kafka-streams-application-reset tool, but as you point out, that doesn't >>>>> help with rebuilding an app's internal state, which would be missing on the >>>>> secondary cluster. (Granted, that may be okay depending on your particular >>>>> application.) >>>>> >>>>> A true "active/active" solution IMO would be to run your same Streams >>>>> app in _both_ clusters (primary, secondary), s.t. the entire application >>>>> state is available and continuously updated in both clusters. As with >>>>> normal consumers, the Streams app should subscribe to any remote topics, >>>>> e.g. with a regex, s.t. the application state will reflect input from >>>>> either source cluster. >>>>> >>>>> This is essentially what Streams' "standby replicas" are -- extra >>>>> copies of application state to support quicker failover. Without these >>>>> replicas, Streams would need to start back at offset 0 and re-process >>>>> everything in order to rebuild state (which you don't want to do during a >>>>> disaster, especially!). The same logic applies to using Streams with MM2. >>>>> You _could_ failover by resetting the app and rebuilding all the missing >>>>> state, or you could have a copy of everything sitting there ready when you >>>>> need it. The easiest way to do the latter is to run your app in both >>>>> clusters. >>>>> >>>>> Hope that helps. >>>>> >>>>> Ryanne >>>>> >>>>> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare < >>>>> adam.bellemare@gmail.com> wrote: >>>>> >>>>>> Hi Ryanne >>>>>> >>>>>> I have a quick question for you about Active+Active replication and >>>>>> Kafka Streams. First, does your org /do you use Kafka Streams? If not then >>>>>> I think this conversation can end here. ;) >>>>>> >>>>>> Secondly, and for the broader Kafka Dev group - what happens if I >>>>>> want to use Active+Active replication with my Kafka Streams app, say, to >>>>>> materialize a simple KTable? Based on my understanding, I topic "table" on >>>>>> the primary cluster will be replicated to the secondary cluster as >>>>>> "primary.table". In the case of a full cluster failure for primary, the >>>>>> producer to topic "table" on the primary switches over to the secondary >>>>>> cluster, creates its own "table" topic and continues to write to there. So >>>>>> now, assuming we have had no data loss, we end up with: >>>>>> >>>>>> >>>>>> *Primary Cluster: (Dead)* >>>>>> >>>>>> >>>>>> *Secondary Cluster: (Live)* >>>>>> Topic: "primary.table" (contains data from T = 0 to T = n) >>>>>> Topic: "table" (contains data from T = n+1 to now) >>>>>> >>>>>> If I want to materialize state from using Kafka Streams, obviously I >>>>>> am now in a bit of a pickle since I need to consume "primary.table" before >>>>>> I consume "table". Have you encountered rebuilding state in Kafka Streams >>>>>> using Active-Active? For non-Kafka Streams I can see using a single >>>>>> consumer for "primary.table" and one for "table", interleaving the >>>>>> timestamps and performing basic event dispatching based on my own tracked >>>>>> stream-time, but for Kafka Streams I don't think there exists a solution to >>>>>> this. >>>>>> >>>>>> If you have any thoughts on this or some recommendations for Kafka >>>>>> Streams with Active-Active I would be very appreciative. >>>>>> >>>>>> Thanks >>>>>> Adam >>>>>> >>>>>> >>>>>> --000000000000415b77058e6ca4c5--