From dev-return-100806-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon Jan 7 19:23:38 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BB679180647 for ; Mon, 7 Jan 2019 19:23:36 +0100 (CET) Received: (qmail 93122 invoked by uid 500); 7 Jan 2019 18:23:35 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 93110 invoked by uid 99); 7 Jan 2019 18:23:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jan 2019 18:23:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 16B8E180EC6 for ; Mon, 7 Jan 2019 18:23:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.155 X-Spam-Level: ** X-Spam-Status: No, score=2.155 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.144, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, KAM_NUMSUBJECT=0.5, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 2dnKm65vZjt4 for ; Mon, 7 Jan 2019 18:23:26 +0000 (UTC) Received: from mail-it1-f174.google.com (mail-it1-f174.google.com [209.85.166.174]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 0242461126 for ; Mon, 7 Jan 2019 18:23:24 +0000 (UTC) Received: by mail-it1-f174.google.com with SMTP id m8so7937735itk.0 for ; Mon, 07 Jan 2019 10:23:24 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=seHaXyG/K5Zh6GpnWSYGDezZUJnNfeWnCxwUIIVdBg0=; b=j/fuWe6y/n2ErvHFnbM9U5USDL3wiGjjJFybHTF0Swp6pGPWqLbJc+cTjpjfZitkjJ U8X9bPcvOD52YV8hb9Tl+MYUUQHXERnW1rZM9IOJFXL1+T+PfSs1o6QGur1rMowAgoG7 Bbkczveje4umyHIyxDk4VzyWp9KfJaf1TJvRfB2wYjgiOl6Vl5UCx/94P8nQOaZ6lBFS D1cPiCT6Ezdip0rgCwnVvZFQblPnQTQIqx2ofGZP1oYHIvuqYJW345jz1PuH9ZVnI1wD uM7VKHUvVW0hIULbg6opVqVf7VMSe95hrmm4ViyZyqOiB9UZd2r1nWyf7SuwfXTPPNIB y54g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=seHaXyG/K5Zh6GpnWSYGDezZUJnNfeWnCxwUIIVdBg0=; b=DbJ0Gz3zPy8P5Lnpzv6g4NW5StP9EXmCMPZTaLO+7yXMfMo4WJ5uSdXe5QAisTy1ps vRCRAHodxrjTIYAo4aRUSzi6jC1ffXnb8sx8VPdjWoLzI4R5p78bxSfLA9f7W5DEInBL ZufwSvtrRmdaEvIJxsmFz7vtA2IwFpC0RysD+eSgMUq8RCioMjM4gXfG0ebWsg57zM+f 9naZ+AybhHDJEhuLqZiqtUTcuB8w6ncZv6v0lLIbd0WO6UT1FKs7S6jF4DNNDzOk/P0d 6PheHyZTBTj65RwRzi8svG+ChBqvIf869xd3XmcwqaRUKxUvBjEXp5H96/iaIrfR4OAr b9Og== X-Gm-Message-State: AJcUukeGulLXbiIXwbifHjZ8S0ABQ9BdrVs18cEuuBbdc/rgJKgBspWh u6qL/lj7KXhpN1Yxg+Z2zBxh/RdPQ8t+RutMbw8aoA== X-Google-Smtp-Source: ALg8bN58WO5c89BpdzzVq1+dxLvBH8aGrknaP3p047OnA/kTX+BGkeHLfvfyCh8gNdbbaODfHOAPTey5qAC23mtbOdA= X-Received: by 2002:a24:7609:: with SMTP id z9mr8740154itb.66.1546885402569; Mon, 07 Jan 2019 10:23:22 -0800 (PST) MIME-Version: 1.0 References: <5c10625d.1c69fb81.248b7.9324SMTPIN_ADDED_MISSING@mx.google.com> <69f7b0b70d8645caa821f691aba229fd@LMPRDEXC142.IGI.IG.LOCAL> In-Reply-To: From: Ryanne Dolan Date: Mon, 7 Jan 2019 12:23:06 -0600 Message-ID: Subject: Re: [DISCUSS] KIP-382: MirrorMaker 2.0 To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="000000000000edb5df057ee252ec" --000000000000edb5df057ee252ec Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Ewen, thanks for the questions. > On the ACL management, can you explain how things are supposed to work... There are two types of topics in play here: regular topics and remote topics. MM2 replicates regular source topics -> remote topics. MM2 doesn't create or modify regular topics, but it fully owns and manages remote topics, including their ACL. MM2 automatically syncs ACL changes from source topic to remote topic (but not the other way around), s.t. if an operator changes the ACL on a source topic, the corresponding remote topic is updated. Only MM2 can write to remote topics, and their ACLs are configured to enforce this. Additionally, any READ rules for a source topic are propagated to the remote topic. This is important for consumer migration/failover to work reliably -- a failed-over consumer must have access to the replicated data in a foreign cluster. Keep in mind they are the same records after all! Where there is an A -> B replication flow, a principal with read access to Cluster A's topic1 will also have read access to Cluster B'a A.topic1 (a remote topic). However, that does NOT mean that the same principal will automatically have access to Cluster B's topic1, since topic1 is not a remote topic. This is because the records in Cluster A's topic1 are NOT the same as the records in Cluster B's topic1, and in fact they may have vastly different access control requirements. Consider the common arrangement where an organization has multiple Kafka clusters for prod vs staging, internet/DMZ vs intranet, etc. You might want to use MM2 to replicate a topic "foo" from prod to staging, for example. In this case, the topic will show up in the staging cluster as "prod.foo". MM2 will make sure that any principal that can read "foo" in prod can also read "prod.foo" in staging, since it's the same principal and the same data. You don't have to manually create or configure "prod.foo" -- you just tell MM2 to replicate "foo" from prod to staging. In this example, MM2 does not touch anything in the prod cluster -- it just reads from "foo". (In fact, it doesn't write to prod at all, not even offsets). And importantly, any changes to staging topics don't effect anything in prod. > is this purely for a mirroring but not DR and failover cases DR (failover/failback, and client migration in general) is the primary motivation for the MM2 design. ACL sync in particular exists to ensure clients can migrate between clusters and still have access to the same data= . > In particular, the rules outlined state that only MM2 would be able to write on the new cluster Only MM2 can write to _remote topics_ (on any cluster). That says nothing of normal topics. > at some point you need to adjust ACLs for the failed-over apps to write It depends. WRITE access is not sync'd across clusters by MM2, so you may need some other mechanism to manage that. This limitation is by design -- it's unsafe and generally undesirable to apply write access across clusters= . Consider the prod vs staging example again. If you are replicating "foo" from prod -> staging, you want app1 to have access to both prod's "foo" and staging's "prod.foo", since this is the same principal and the same data, just on separate clusters. But that doesn't mean you want prod apps to write to staging, nor staging apps to write to prod. This is probably the whole reason you have staging vs prod in the first place! Instead, you will want to be deliberate when promoting an application from staging to prod, which may involve manually granting WRITE access to prod topics. > how things are supposed to work when you need to migrate to the new cluster If you have a set of clusters with MM2 syncing topics between them ("active/active"), you can migrate consumers among them using RemoteClusterUtils, which will figure out the new offsets, partitions, and topic names for you. The topic names will be different after migration. There are two main scenarios 1) a consumer is subscribed to a normal topic only, e.g. "topic1", 2) a consumer is aggregating across topics from multiple clusters, e.g. "topic1" but also "B.topic1", "C.topic1"... In (1), migrating a consumer from cluster A -> cluster B will result in the subscription being changed from "topic1" to "A.topic1". These are the same records in the same order, and the consumer can safely resume processing from the latest checkpoint. In (2), you get: topic1 -> A.topic1 (cuz B's A.topic1 came from A's topic1) B.topic1 -> topic1 (cuz A's B.topic1 came from B's topic1) C.topic1 -> C.topic1 (cuz this topic is remote on both A and B) ...and so on. RemoteClusterUtils does this logic for you. It's the same simple algorithm for any sort of migration, including failover and subsequent failback. For _producers_, migration is very simple -- the topic name does not change. If you migrate a producer from cluster A -> cluster B, it still produces to "topic1", only it is now cluster B's topic1. This captures the fact that the data originates at cluster B. MM2 can then replicate this data back to A, but it will of course go to "B.topic1". Now, in order to migrate a producer to a new cluster, the principal will need WRITE access there. As stated above, MM2 will not sync this for you by design. > ... compatibility mode ... The legacy mode disables all the new features, including checkpoints, topic renaming etc, so that MM2 can be used to replace MM1 in existing deployments. A wrapper script will honor ./bin/kafka-mirror-maker.sh's arguments to ease the transition. But I wouldn't go so far as to call it compatible. > Does that include custom extensions like MessageHandlers? Michael Pearce suggested we should honor this API in MM2, which I believe is possible by trivially adapting MessageHandler to Connect's Transformation. I don't object to this. > I may have missed something in this proposal since it's pretty long ... and now this email is pretty long :) Thanks again, Ryanne On Fri, Jan 4, 2019 at 9:49 PM Ewen Cheslack-Postava wrote: > Hey Ryanne, > > Sorry, late to the game here. > > On the ACL management, can you explain how things are supposed to work wh= en > you need to migrate to the new cluster? Or is this purely for a mirroring > but not DR and failover cases? In particular, the rules outlined state th= at > only MM2 would be able to write on the new cluster. But if you have a DR > case, at some point you need to adjust ACLs for the failed-over apps to > write. And presumably you want the original set of write ACLs to apply, s= o > it'd need to apply them during some failover event? > > On the compatibility story, you mention running a compatibility mode. Wha= t > does this mode do with ensuring settings match and offsets are reused? I > see in the proposal that we define new settings, but is an existing MM1 > config guaranteed to continue working? Does that include custom extension= s > like MessageHandlers? I'm not sure I entirely understand the compatibilit= y > story here (which could also be that we just don't provide one -- just wa= nt > to make sure it is clear). > > I may have missed something in this proposal since it's pretty long, let = me > know if there was something obvious I overlooked. > > Thanks, > -Ewen > > On Mon, Dec 31, 2018 at 12:57 PM Ryanne Dolan > wrote: > > > > transactional messages [...] could result in frequent writes to the > > offset mapping topic. > > > > Becket, I think we could limit writes to a max frequency to ameliorate > this > > problem. > > > > > I am wondering if users can just seek by timestamp and get a more > > precise mapping [...] > > > I assume that MM2 will mirror the timestamps from source to target > > without being changed > > > > Yes, MM2 passes timestamps along, and also any headers. The timestamps > are > > useful for measuring replication lag etc, but they are not particularly > > useful for consumer migration (failover etc). I can expound on this > further > > if you like, but in practice offsets are better if you have them. > > > > > I don't think this should block MM2 given there are a lot of other > > benefits already > > > > Thanks! I appreciate your support. > > Ryanne > > > > On Fri, Dec 28, 2018 at 9:54 PM Becket Qin wrote= : > > > > > Hi Ryanne, > > > > > > Thanks for the reply. You are right. The topic naming and ACL sanity > > check > > > should probably be a separate discussion. > > > > > > Regarding the offset mapping. Thought about this a bit more. We may > need > > to > > > consider the cases such as logs including transactional messages. In > that > > > case, the offset mapping may not be contiguous due to the existence o= f > > the > > > transaction control messages. It could result in frequent writes to t= he > > > offset mapping topic. > > > > > > I don't think this should block MM2 given there are a lot of other > > benefits > > > already. That said, if we use periodic offset mapping records, I am > > > wondering if users can just seek by timestamp and get a more precise > > > mapping (at millisecond granularity) from the source cluster to the > > target > > > cluster. But admittedly, this approach has its own limitations such a= s > > > users are expected to use LogAppendTime for the original topic. (BTW,= I > > > assume that MM2 will mirror the timestamps from source to target > without > > > being changed) > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Dec 27, 2018 at 1:16 AM Ryanne Dolan > > > wrote: > > > > > > > Becket, this is great feedback, thanks. > > > > > > > > > having a reserved character for the topics is probably something > > worth > > > > doing in general > > > > > > > > Agreed, but we shouldn't make this a requirement for MM2, or else i= t > > > > wouldn't work with older versions of Kafka, complicating adoption, > > > testing > > > > etc. In particular, we'll want to prove MM2 against production > > workloads > > > > without first upgrading Kafka brokers. > > > > > > > > I think we should 1) make the separator configurable in MM2, > defaulting > > > to > > > > a period for now, 2) in a separate KIP, propose a special separator > > > > character as you suggest, 3) maybe update the default at some point= . > > > > > > > > We can endeavor to do this all in the same release, which would hav= e > > the > > > > effect you want. > > > > > > > > > It might be better to add a config like allowAclMismatch to let > user > > > > decide what should be the right behavior, i.e. either fail a mirror > if > > > ACL > > > > mismatch, or mirror it with different ACLs. > > > > > > > > What would it mean to "fail a mirror"? I think it would be strange = if > > > > replication suddenly stopped after someone changes an ACL somewhere= . > > > > > > > > I think for users that want ACLs to mismatch, they'd just disable > > > > sync.topic.acls and manage ACLs themselves. I want MM2 to do the > right > > > > thing by default, but I don't think it should be responsible for > > > enforcing > > > > policies or protecting against changes beyond its control and > purview. > > > > > > > > > seems possible to achieve per message granularity, given that the= re > > is > > > a > > > > single writer to the remote topic [...] if the returned target offs= et > > is > > > > different from the expectation [...], MM2 emit a new mapping messag= e > to > > > the > > > > offset mapping topic > > > > > > > > This is a fantastic idea. With this approach, we can reduce the > > recovery > > > > period to a very small number of offsets -- effectively the same as > the > > > > latest commit. This isn't exactly one-to-one offset translation, bu= t > > as I > > > > said we don't really need that -- we just want to get as close to t= he > > > > latest consumer commit as possible without going past it, which you= r > > > > approach does very well. Thanks for this improvement. > > > > > > > > Ryanne > > > > > > > > > > > > On Tue, Dec 25, 2018 at 1:38 AM Becket Qin > > wrote: > > > > > > > > > Hi Ryanne, > > > > > > > > > > Thanks for the reply. Please see a few more comments below. > > > > > > > > > > 1. Reserved character for topic names. > > > > > > > > > > > I think the limitations on topic names are imposed by Kafka > itself > > > > > (beyond > > > > > > those from Zookeeper and the file system), so it might be > possible > > to > > > > > add a > > > > > > reserved character for this purpose. This would essentially > promote > > > the > > > > > > concept of "remote topics" from MirrorMaker to Kafka in general= , > > > which > > > > > > might be nice. In particular, we could add native support for > > remote > > > > > topics > > > > > > to KafkaConsumer if we had a baked-in way to distinguish remote > vs > > > > > > non-remote topics. Definitely worth investigating for a future > KIP > > :) > > > > > > > > > > I do not worry much about the consumer side because it is fairly > > simple > > > > to > > > > > expand the subscription from a classic topic to a local + remote > > topic, > > > > as > > > > > long as users know the rule. I am more concerned about having a w= ay > > to > > > > > ensure the users can safely use the remote topic without potentia= l > > > > issues. > > > > > There were some earlier discussion about the hierarchical topic > > names / > > > > > topic namespaces. Those are probably too much in our context, but > > > having > > > > a > > > > > reserved character for the topics is probably something worth doi= ng > > in > > > > > general. It seems simple enough and will help address the potenti= al > > > > > confusion between local / remote topic names. > > > > > > > > > > 2. ACLs > > > > > I think in some cases user may want to have different ACL in loca= l > > and > > > > > remote topics, but in some other cases, users may want to make su= re > > > they > > > > > are the same to avoid unexpected behaviors. It might be better to > > add a > > > > > config like allowAclMismatch to let user decide what should be th= e > > > right > > > > > behavior, i.e. either fail a mirror if ACL mismatch, or mirror it > > with > > > > > different ACLs. > > > > > > > > > > 3. Offset mapping between source and destination Kafka cluster. > > > > > I haven't thought about this thoroughly, but seems possible to > > achieve > > > > per > > > > > message granularity, given that there is a single writer to the > > remote > > > > > topic. What we can do the is the following: > > > > > 1. For the first message MM2 mirrors, it will always emit a [sour= ce > > > > offset, > > > > > target offset] mapping to the offset mapping topic. (.e.g. (99 -> > > 199) > > > > > 2. After that, MM2 expect the offset in the destination partition > to > > > > > increment one by one, corresponding to each message mirrored from > > > source. > > > > > (100 -> 200, 101 -> 201, etc...) > > > > > 3. At any given point, if the returned target offset is different > > from > > > > the > > > > > expectation (very likely larger, otherwise there is message loss)= , > > MM2 > > > > emit > > > > > a new mapping message to the offset mapping topic. ( supposedly, = if > > 99 > > > -> > > > > > 199, then MM2 expect 199 -> 299, but if 199 -> 300, MM2 emits the > > pair > > > > (199 > > > > > -> 300)) > > > > > > > > > > In this case, for any committed source offset, the target offset > can > > be > > > > > determined by doing the following: > > > > > 1. Find the offset mapping entry which contains the source offset > > that > > > is > > > > > closest but no larger than the committed source offset. (e.g. > > committed > > > > > offsets 150 will be mapped to the entry (99 -> 199)) > > > > > 2. Add a offsets difference because we know since that entry the > > > offsets > > > > > are increasing one by one. (target offsets =3D 199 + (150 -99) = =3D 250) > > > > > > > > > > If the target offset is greater than the log end offset of the > > > partition, > > > > > that means that message has not been mirrored yet. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Thu, Dec 20, 2018 at 6:00 AM Ryanne Dolan < > ryannedolan@gmail.com> > > > > > wrote: > > > > > > > > > > > Becket, thanks for taking a look. > > > > > > > > > > > > > 1. Only relying on the topic naming seems a little fragile. > > > > > > > > > > > > I think the limitations on topic names are imposed by Kafka > itself > > > > > (beyond > > > > > > those from Zookeeper and the file system), so it might be > possible > > to > > > > > add a > > > > > > reserved character for this purpose. This would essentially > promote > > > the > > > > > > concept of "remote topics" from MirrorMaker to Kafka in general= , > > > which > > > > > > might be nice. In particular, we could add native support for > > remote > > > > > topics > > > > > > to KafkaConsumer if we had a baked-in way to distinguish remote > vs > > > > > > non-remote topics. Definitely worth investigating for a future > KIP > > :) > > > > > > > > > > > > > 2. For ACLs, is it possible that the local topic and remote > topic > > > > have > > > > > > different ACLs? Will the connector do the sanity check? > > > > > > > > > > > > MM2 won't do full two-way sync of ACLs between clusters, but > > instead > > > > will > > > > > > simply propagate ACL changes from source topics to their > downstream > > > > > remote > > > > > > topics. For example, if a principal can READ from topic1 in > cluster > > > A, > > > > > the > > > > > > same principal should be able to READ A.topic1 in cluster B. > > > Otherwise, > > > > > > migrating a consumer from A -> B wouldn't work reliably. > > > > > > > > > > > > OTOH, say a super-user force-changes the ACL for a remote topic= , > > that > > > > > > change would not be sync'd upstream to the source topic. In all > > > cases, > > > > > the > > > > > > "normal" source topics are the source-of-truth for ACL and > > > > configuration > > > > > of > > > > > > remote topics managed by MM2. > > > > > > > > > > > > So to your specific question: no, MM2 will not sanity-check > remote > > > > > topics, > > > > > > since MM2 should be the only principal making any changes to > remote > > > > > topics, > > > > > > just as MM2 is the only principal allowed to WRITE to remote > > topics. > > > > > > > > > > > > > 3. For the checkpoint. Because the consumer may commit at > > arbitrary > > > > > > offset. Does the connector need to keep a mapping between each > > source > > > > > > offset to destination offset? If so how would that be done? > > > > > > > > > > > > Offset translation will not be possible for arbitrary offsets. > > > Instead, > > > > > > checkpoints simply provide recent offsets that are safe for a > > foreign > > > > > > consumer to seek to. To accomplish this, MirrorSourceConnector > will > > > > > > periodically (every checkpoint.interval.seconds) emit > local-remote > > > > offset > > > > > > pairs for the records being replicated. The connector knows thi= s > > > > mapping > > > > > > because it gets upstream offsets from the consumer and > > corresponding > > > > > > downstream offsets from the producer ACK. Then > > > > MirrorCheckpointConnector > > > > > > can use this sparse mapping when emitting checkpoints to the > target > > > > > > cluster. > > > > > > > > > > > > For example, if a consumer group has committed up to offset 100= , > > and > > > > the > > > > > > connector knows that offset 99 is equivalent to offset 199 on t= he > > > > target > > > > > > cluster, then the connector can safely emit a checkpoint for th= at > > > > > consumer > > > > > > group with target offset 199 =3D=3D source offset 99, since 99 = < 100. > > > > Notice > > > > > > the consumer is actually past 100 on the source cluster, but it= 's > > > only > > > > > safe > > > > > > to seek to 199 on the target cluster. > > > > > > > > > > > > If we checkpoint every 5 seconds in this way, we can be sure > that a > > > > > > failed-over consumer won't end up re-processing much more than = 5 > > > > seconds > > > > > of > > > > > > data. > > > > > > > > > > > > Thanks for the great questions. > > > > > > Ryanne > > > > > > > > > > > > > > > > > > On Thu, Dec 13, 2018 at 12:07 AM Becket Qin < > becket.qin@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > Thanks for the KIP. In general, I think the proposal makes a > lot > > of > > > > > sense > > > > > > > to me. Unifying Kafka clusters in different locations has bee= n > a > > > very > > > > > > > important scenario. Right now open source community does not > > have a > > > > > > > standard solution to that. > > > > > > > > > > > > > > A few comments/questions are following: > > > > > > > > > > > > > > 1. Only relying on the topic naming seems a little fragile. I= t > > > seems > > > > > > > difficult to prevent conflict names. One workaround might be > > > adding a > > > > > > > config to the broker, preventing topics with character such a= s > > "_" > > > or > > > > > "." > > > > > > > from being created, unless it is from some specific client su= ch > > as > > > > MM2 > > > > > > > admin clients. > > > > > > > > > > > > > > 2. For ACLs, is it possible that the local topic and remote > topic > > > > have > > > > > > > different ACLs? Will the connector do the sanity check? > > > > > > > > > > > > > > 3. For the checkpoint. Because the consumer may commit at > > arbitrary > > > > > > offset. > > > > > > > Does the connector need to keep a mapping between each source > > > offset > > > > to > > > > > > > destination offset? If so how would that be done? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > On Thu, Dec 13, 2018 at 8:23 AM Ryanne Dolan < > > > ryannedolan@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > Consuming system restarts and restates from compacted > topics, > > > > using > > > > > > > > *.account_state > > > > > > > > > > > > > > > > Michael, I think I understand your concern. With MM2's remo= te > > > topic > > > > > > > > concept, it's the consumer's responsibility to aggregate > > records > > > > > across > > > > > > > > .*account_state. MM2 will not merge these for you (by > design). > > > This > > > > > > > > complicates your consumer that expects a single topic to be > > > > compacted > > > > > > > > across all clusters, since log-compaction does not span > topics. > > > > > > > > > > > > > > > > Notice that MM2 isn't really breaking anything here. The > > problem > > > is > > > > > > that > > > > > > > > you are using MirrorMaker to aggregate records from multipl= e > > > > clusters > > > > > > > into > > > > > > > > a single topic, which MM2 will not do by default. > > > > > > > > > > > > > > > > It's easy to accomplish the same thing with MM2 though. You > > have > > > a > > > > > > couple > > > > > > > > options: > > > > > > > > > > > > > > > > 1) turn off topic renaming or use "legacy mode". MM2 won't > get > > in > > > > > your > > > > > > > way, > > > > > > > > but it won't help much for DR, failover/failback etc. You'r= e > on > > > > your > > > > > > own > > > > > > > :) > > > > > > > > > > > > > > > > 2) Aggregate your topic downstream of MM2. Turn > .*account_state > > > > into > > > > > > > > account_state.aggregate, which itself is log-compacted. > KTables > > > > make > > > > > > this > > > > > > > > trivial! > > > > > > > > > > > > > > > > With (2) you still get the nice DR semantics. The KTable wi= ll > > > > > represent > > > > > > > the > > > > > > > > latest account states aggregated across all clusters. If yo= ur > > > > > producers > > > > > > > > need to migrate/failover/failback, the KTable in each regio= n > > will > > > > be > > > > > > > > eventually consistent, and downstream consumers won't notic= e. > > I'd > > > > > > > recommend > > > > > > > > blacklisting .*aggregate in MM2 so these don't get replicat= ed > > > > > > themselves. > > > > > > > > > > > > > > > > TBH, I thought about including this as a feature in MM2, s.= t. > > you > > > > > would > > > > > > > get > > > > > > > > a foo.aggregate topic automatically. Personally, I think > > Streams > > > is > > > > > > > better > > > > > > > > suited than Connect for this, but I won't rule it out for a > > > future > > > > > KIP. > > > > > > > > > > > > > > > > Thanks again! > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 12, 2018 at 2:49 PM Michael Pearce < > > > > > Michael.Pearce@ig.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Ok so you have your transaction system running master > master. > > > > > > > > > > > > > > > > > > You have three dcs, regions where-ever. > > > > > > > > > > > > > > > > > > Accounts are distributed across area based on an number o= f > > > > factors > > > > > > > > > > > > > > > > > > Account A + B are in Ireland > > > > > > > > > Account C + D are in Germany > > > > > > > > > Account E are in UK > > > > > > > > > > > > > > > > > > Lets prefix call kafka cluster in Ireland Z, Germany Y an= d > > UK X > > > > > > > > > > > > > > > > > > So now I have topics: > > > > > > > > > > > > > > > > > > Z.account_state (holding state for accounts A + B) > > > > > > > > > Y.account_state (holding state for accounts C + D) > > > > > > > > > X.account_state (holding state for account E) > > > > > > > > > > > > > > > > > > To get a full view of all accounts a consumer would consu= me > > > > > > > > *.account_state > > > > > > > > > Great! > > > > > > > > > > > > > > > > > > Now image we take a planned or unplanned outage in the UK= . > > > > > > > > > We move/shift the transactional processing of account E t= o > > > > Ireland. > > > > > > > > > > > > > > > > > > Now we end up with > > > > > > > > > Z.account_state (holding state for accounts A + B + > current > > > > state > > > > > > for > > > > > > > > > account E) > > > > > > > > > Y.account_state (holding state for accounts C + D) > > > > > > > > > X.account_state (holding outdated state for account E) > > > > > > > > > > > > > > > > > > > > > > > > > > > Consuming system restarts and restates from compacted > topics, > > > > using > > > > > > > > > *.account_state > > > > > > > > > > > > > > > > > > It will get both current state and also outdated state fo= r > E, > > > > which > > > > > > > state > > > > > > > > > would it use? And this gets worse as it scales and you mo= ve > > the > > > > > > > > processing > > > > > > > > > of accounts around over time. > > > > > > > > > > > > > > > > > > > > > > > > > > > Likewise the issue is the same without compacted state. > > > > > > > > > > > > > > > > > > Imagine order of state change is important by account > > globally, > > > > how > > > > > > do > > > > > > > > you > > > > > > > > > re-order the state changes? As order guarantee can be giv= en > > > > within > > > > > a > > > > > > > > single > > > > > > > > > topic partition only. You only care for order by the > account > > > (not > > > > > for > > > > > > > > > offets being same, we simply care that updates are in ord= er > > and > > > > > > latest > > > > > > > > > state is at head on restart we ignore offsets). So it > doesn=E2=80=99t > > > > > matter > > > > > > if > > > > > > > > in > > > > > > > > > Z cluster you see update A1, A2, B1, A3, B2, E1, E2 and i= n > > > > cluster > > > > > Y > > > > > > > you > > > > > > > > > see B1,B2, A1,E1, A2, A3, E2 as the ordering by of the > > updates > > > > > > account > > > > > > > is > > > > > > > > > preserved. > > > > > > > > > > > > > > > > > > With the topic solution your suggesting we would have no > way > > > true > > > > > way > > > > > > > of > > > > > > > > > replaying and re-constituting the order between > > X.account_state > > > > and > > > > > > > > > Z.account_state topics in the case of E, as messages will > be > > in > > > > > > > different > > > > > > > > > topics and partitions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -----Original Message----- > > > > > > > > > From: Ryanne Dolan > > > > > > > > > Sent: Wednesday, December 12, 2018 4:37 PM > > > > > > > > > To: dev@kafka.apache.org > > > > > > > > > Subject: Re: [DISCUSS] KIP-382: MirrorMaker 2.0 > > > > > > > > > > > > > > > > > > > You haven=E2=80=99t described how one would handle the = ordering > > > issues > > > > > and > > > > > > > also > > > > > > > > > the compaction issues where transactional processing is > > > > > master-master > > > > > > > in > > > > > > > > > regions, where the processing is sticky to region but of > > > failure > > > > or > > > > > > > > > planned, processing of certain accounts move regions. > > > > > > > > > > > > > > > > > > Michael, a couple points: > > > > > > > > > > > > > > > > > > - With "remote topics", the order of events is consistent > > > between > > > > > > > > clusters. > > > > > > > > > Cluster A's "topic1" is the same records in the same orde= r > as > > > > > cluster > > > > > > > B's > > > > > > > > > "A.topic1". The plan is to enable exactly-once semantics > > within > > > > MM2 > > > > > > so > > > > > > > > > there aren't additional dupes either (though I believe th= is > > > will > > > > > > > require > > > > > > > > > support within Connect and a separate KIP). > > > > > > > > > > > > > > > > > > - A consumer that is only interested in events produced i= n > a > > > > > > particular > > > > > > > > > region can migrate to a cluster in a different region by > > > updating > > > > > > it's > > > > > > > > > subscription accordingly. For example, a consumer in > us-west > > > > > > processing > > > > > > > > > events local to us-west would consume topics like "topic1= " > (a > > > > > normal > > > > > > > > > topic). If you migrate this consumer to us-east, it would > > need > > > to > > > > > > > > subscribe > > > > > > > > > to "us-west.topic1" instead. It's clear from the naming > > > > convention > > > > > > that > > > > > > > > > "us-west.topic1" is a replicated topic with records > > originating > > > > > from > > > > > > a > > > > > > > > > remote cluster. > > > > > > > > > > > > > > > > > > I'm not sure I understand your concern w.r.t compacted > topics > > > and > > > > > > > state. > > > > > > > > > Can you elaborate? > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 12, 2018 at 8:41 AM Michael Pearce < > > > > > > Michael.Pearce@ig.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Ryanne, > > > > > > > > > > > > > > > > > > > > You haven=E2=80=99t described how one would handle the = ordering > > > issues > > > > > and > > > > > > > also > > > > > > > > > > the compaction issues where transactional processing is > > > > > > master-master > > > > > > > > in > > > > > > > > > > regions, where the processing is sticky to region but o= f > > > > failure > > > > > or > > > > > > > > > > planned, processing of certain accounts move regions. > > > > > > > > > > > > > > > > > > > > Also I ask that you keep compatibility of the handler a= pi > > > > > interface > > > > > > > in > > > > > > > > MM > > > > > > > > > > into MM2. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -----Original Message----- > > > > > > > > > > From: Ryanne Dolan > > > > > > > > > > Sent: Wednesday, December 12, 2018 6:41 AM > > > > > > > > > > To: dev@kafka.apache.org > > > > > > > > > > Subject: Re: [DISCUSS] KIP-382: MirrorMaker 2.0 > > > > > > > > > > > > > > > > > > > > > One based on hops using headers, and another based on > > topic > > > > > > naming. > > > > > > > > > > > > > > > > > > > > Michael, this was also suggested by Alex Mironov. Other > > > > > replication > > > > > > > > > engines > > > > > > > > > > use headers as you describe, but there are several issu= es > > > with > > > > > this > > > > > > > > > > approach: > > > > > > > > > > > > > > > > > > > > - The Connect framework provides Transformations that > could > > > be > > > > > used > > > > > > > for > > > > > > > > > > this purpose, so MM2 doesn't necessarily need to addres= s > > this > > > > > > feature > > > > > > > > > > directly. For example, a Transformation could tag each > > record > > > > > with > > > > > > > its > > > > > > > > > > source cluster alias or could decrement a hop/TTL value > if > > > you > > > > > > like. > > > > > > > > > > > > > > > > > > > > - We want MM2 to benefit from "shallow iteration", > meaning > > > that > > > > > > e.g. > > > > > > > > > > compressed message sets should pass through MM2 without > > being > > > > > > > > > decompressed > > > > > > > > > > and decomposed. In many cases, this can result in > > significant > > > > > > > > performance > > > > > > > > > > gains. But since these message sets are not decomposed > into > > > > > > > individual > > > > > > > > > > messages and so on, we cannot reliably tag each message > > that > > > > > passes > > > > > > > > > > through. > > > > > > > > > > > > > > > > > > > > - Fundamentally, I believe it is a bad idea to treat > topics > > > on > > > > > > > > different > > > > > > > > > > clusters as if they are the same topic. If two clusters > > have > > > > > topics > > > > > > > of > > > > > > > > > the > > > > > > > > > > same name, they are still inherently different topics, = as > > > they > > > > > > exist > > > > > > > on > > > > > > > > > > different clusters, with potentially different records > and > > > > > > certainly > > > > > > > > > > different offsets. Moreover, there is no robust way to > keep > > > > such > > > > > > > topics > > > > > > > > > > consistent and in sync. > > > > > > > > > > > > > > > > > > > > - Some applications are interested in events from a > > > particular > > > > > data > > > > > > > > > > center/region/cluster, while other applications will wa= nt > > to > > > > > > process > > > > > > > > > events > > > > > > > > > > regardless of where they are produced. Indeed, a common > > > > use-case > > > > > > for > > > > > > > > > > building out multiple clusters in the first place is to > > > support > > > > > > this > > > > > > > > sort > > > > > > > > > > of geolocation-aware processing and aggregation. It > sounds > > > like > > > > > > your > > > > > > > > org > > > > > > > > > > attempts to make topics the same everywhere, which is > > > > undesirable > > > > > > in > > > > > > > > many > > > > > > > > > > cases. > > > > > > > > > > > > > > > > > > > > - Advanced operations such as automatic failover and > > failback > > > > > rely > > > > > > on > > > > > > > > the > > > > > > > > > > ability to segregate records based on their cluster of > > > origin, > > > > > > while > > > > > > > > > > preserving order within each topic-partition. This is > > > extremely > > > > > > > > difficult > > > > > > > > > > if your app's producers and MM's producers "cross > streams" > > by > > > > > > writing > > > > > > > > > into > > > > > > > > > > the same topic. (Your mention of "ensure inflight > > processing > > > at > > > > > the > > > > > > > old > > > > > > > > > > region ceased" etc is hand-holding symptomatic of this > > > > problem.) > > > > > > > > > > > > > > > > > > > > - With a consistent naming convention in place (along > with > > > > > > > > checkpointing, > > > > > > > > > > heartbeats and the other "best practices" mentioned in > the > > > > KIP), > > > > > we > > > > > > > can > > > > > > > > > > build tooling that understands multi-cluster > environments. > > > For > > > > > > > example, > > > > > > > > > the > > > > > > > > > > KIP describes a utility for translating consumer offset= s > > > > between > > > > > > > > > clusters. > > > > > > > > > > This will enable operators to migrate consumer groups > > between > > > > > > > clusters > > > > > > > > > > without knowing anything about the topics involved. > > > > > > > > > > > > > > > > > > > > That all said, I am sure some organizations will want t= o > > > apply > > > > > > their > > > > > > > > own > > > > > > > > > > conventions, and I don't believe MM2 should get in the > way > > of > > > > > that. > > > > > > > > > > > > > > > > > > > > Thanks again! > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > On Tue, Dec 11, 2018 at 7:20 PM michael.andre.pearce > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Another benefit of using hops vs topic naming (also h= ow > > we > > > > > > > currently > > > > > > > > do > > > > > > > > > > > master master in my org) > > > > > > > > > > > You have a transactional processing app that's multi > > > > regioned. > > > > > So > > > > > > > for > > > > > > > > > > sake > > > > > > > > > > > of discussion all A and B accounts process normally i= n > > > > ireland > > > > > > > region > > > > > > > > > > all C > > > > > > > > > > > and D in germany region and all E in Uk region. > Increase > > > > > regions > > > > > > to > > > > > > > > > > across > > > > > > > > > > > the globe almost one per financial centre where close > > > regions > > > > > > might > > > > > > > > > mesh > > > > > > > > > > > together one to one and continental regions form some > > > > backbones > > > > > > > > trunks. > > > > > > > > > > > If uk region goes down planned or unplanned we move t= he > > > > > > > transactional > > > > > > > > > > > routing and processing to germany region. During this > > flip > > > > over > > > > > > we > > > > > > > > > ensure > > > > > > > > > > > inflight processing at the old region ceased and > restate > > > > > account > > > > > > > > states > > > > > > > > > > > from topics before further processing thus ensuring n= o > > out > > > of > > > > > > order > > > > > > > > > > message > > > > > > > > > > > production by account. > > > > > > > > > > > The with using region broker or some other topic > prefixes > > > > this > > > > > > will > > > > > > > > > mean > > > > > > > > > > > now i have a topic for uk region with some E account > data > > > and > > > > > > after > > > > > > > > > > process > > > > > > > > > > > movement will end up with E accounts in the topic for > > > germany > > > > > > > region. > > > > > > > > > Now > > > > > > > > > > > ordering is lost. Worst still if compacted topic i wi= ll > > > have > > > > > two > > > > > > > > states > > > > > > > > > > as > > > > > > > > > > > state would be in two topics. > > > > > > > > > > > With using hops and single named topic (as we do > already > > in > > > > our > > > > > > own > > > > > > > > > > custom > > > > > > > > > > > handlers which do hop logic with todays mirrormakers) > we > > > can > > > > > > avoid > > > > > > > > this > > > > > > > > > > > issue entirely as ordering by account is preserved as > is > > > all > > > > in > > > > > > one > > > > > > > > > topic > > > > > > > > > > > still and also when using compacted we have only one > > state. > > > > > > > > > > > > > > > > > > > > > > Before you say why not name topics a. b. I use that t= o > > > > simplify > > > > > > the > > > > > > > > > case > > > > > > > > > > > to be able describe it. Accounts are autogenrated e.g= . > > > > A16E45T1 > > > > > > and > > > > > > > > > > C43F4SA > > > > > > > > > > > could be processing in germany region currently and > > C43F2SA > > > > > could > > > > > > > be > > > > > > > > in > > > > > > > > > > uk > > > > > > > > > > > region currently. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sent from my Samsung Galaxy smartphone. > > > > > > > > > > > -------- Original message --------From: Andrew Otto < > > > > > > > > > otto@wikimedia.org> > > > > > > > > > > > Date: 11/12/2018 14:28 (GMT+00:00) To: > > > > dev@kafka.apache.org > > > > > > > > Subject: > > > > > > > > > > > Re: [DISCUSS] KIP-382: MirrorMaker 2.0 > > > > > > > > > > > Wikimedia currently implements 'master <-> master' > > > > replication > > > > > by > > > > > > > > > > manually > > > > > > > > > > > prefixing topics with datacenter names, and then > > > configuring > > > > > > > > > MirrorMaker > > > > > > > > > > to > > > > > > > > > > > only replicate topics that begin with a DC name to > > another. > > > > > > > > > > > > > > > > > > > > > > While having topics named with topological details is > > > > > > manageable, I > > > > > > > > > > > wouldn't say it is desirable. It pushes knowledge of > the > > > > > > > replication > > > > > > > > > > > topology up to clients. Even if MirrorMaker was the > one > > > > doing > > > > > > the > > > > > > > > > topic > > > > > > > > > > > prefixing, downstream consumers of a group of > replicated > > > > topics > > > > > > are > > > > > > > > > still > > > > > > > > > > > going to have to know to subscribe to the correctly > > > prefixed > > > > > > > topics. > > > > > > > > > > > > > > > > > > > > > > If possible I'd much prefer header + hops based > > replication > > > > > > rather > > > > > > > > than > > > > > > > > > > > lots of renamed topics. But either way, this KIP wou= ld > > be > > > > > > > > tremendously > > > > > > > > > > > useful to us so I support it all the way! :) > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 11, 2018 at 5:32 AM Michael Pearce < > > > > > > > > Michael.Pearce@ig.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > So this is indeed what using headers with hops avoi= ds > > is > > > > > > creating > > > > > > > > > lots > > > > > > > > > > > and > > > > > > > > > > > > lots of topics __, so you can have more complex > > topology > > > > > > setups. > > > > > > > > > > > > > > > > > > > > > > > > I ask why not support having two ways of setting up > and > > > > > closing > > > > > > > the > > > > > > > > > > door? > > > > > > > > > > > > > > > > > > > > > > > > One based on hops using headers, and another based = on > > > topic > > > > > > > naming. > > > > > > > > > > After > > > > > > > > > > > > all flexibility is what we want its for end users h= ow > > to > > > > use > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > =EF=BB=BFOn 12/7/18, 8:19 PM, "Ryanne Dolan" < > > > > ryannedolan@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > Michael, thanks for the comments! > > > > > > > > > > > > > > > > > > > > > > > > > would like to see support for this to be don= e > by > > > > hops, > > > > > > as > > > > > > > > well > > > > > > > > > > > [...] > > > > > > > > > > > > This then allows ring (hops =3D number of broke= rs > in > > > the > > > > > > ring), > > > > > > > > > mesh > > > > > > > > > > > > (every > > > > > > > > > > > > cluster interconnected so hop=3D1), or even a t= ree > > > (more > > > > > fine > > > > > > > > > grained > > > > > > > > > > > > setup) > > > > > > > > > > > > cluster topology. > > > > > > > > > > > > > > > > > > > > > > > > That's a good idea, though we can do this at th= e > > > topic > > > > > > level > > > > > > > > > > without > > > > > > > > > > > > tagging individual records. A max.hop of 1 woul= d > > mean > > > > > > > > "A.topic1" > > > > > > > > > is > > > > > > > > > > > > allowed, but not "B.A.topic1". I think the > default > > > > > behavior > > > > > > > > would > > > > > > > > > > > need > > > > > > > > > > > > to > > > > > > > > > > > > be max.hops =3D 1 to avoid unexpectedly creatin= g a > > > bunch > > > > of > > > > > > > > > > D.C.B.A... > > > > > > > > > > > > topics > > > > > > > > > > > > when you create a fully-connected mesh topology= . > > > > > > > > > > > > > > > > > > > > > > > > Looking ahead a bit, I can imagine an external > tool > > > > > > computing > > > > > > > > the > > > > > > > > > > > > spanning > > > > > > > > > > > > tree of topics among a set of clusters based on > > > > > > inter-cluster > > > > > > > > > > > > replication > > > > > > > > > > > > lag, and setting up MM2 accordingly. But that's > > > > probably > > > > > > > > outside > > > > > > > > > > the > > > > > > > > > > > > scope > > > > > > > > > > > > of this KIP :) > > > > > > > > > > > > > > > > > > > > > > > > > ...standalone MirrorMaker connector... > > > > > > > > > > > > > ./bin/kafka-mirror-maker-2.sh --consumer > > > > > > > > > consumer.properties > > > > > > > > > > > > --producer producer.properties > > > > > > > > > > > > > > > > > > > > > > > > Eventually, I'd like MM2 to completely replace > > legacy > > > > MM, > > > > > > > > > including > > > > > > > > > > > the > > > > > > > > > > > > ./bin/kafka-mirror-maker.sh script. In the > > meantime, > > > > > it's a > > > > > > > > good > > > > > > > > > > idea > > > > > > > > > > > > to > > > > > > > > > > > > include a standalone driver. Something like > > > > > > > > > > > > ./bin/connect-mirror-maker-standalone.sh with t= he > > > same > > > > > > > > high-level > > > > > > > > > > > > configuration file. I'll do that, thanks. > > > > > > > > > > > > > > > > > > > > > > > > > I see no section on providing support for > mirror > > > > maker > > > > > > > > > Handlers, > > > > > > > > > > > > today > > > > > > > > > > > > people can add handlers to have a little extra > > custom > > > > > logic > > > > > > > if > > > > > > > > > > > needed, > > > > > > > > > > > > and > > > > > > > > > > > > the handler api is public today so should be > > > supported > > > > > > going > > > > > > > > > > forwards > > > > > > > > > > > > so > > > > > > > > > > > > people are not on mass re-writing these. > > > > > > > > > > > > > > > > > > > > > > > > Great point. Connect offers single-message > > > > > transformations > > > > > > > and > > > > > > > > > > > > converters > > > > > > > > > > > > for this purpose, but I agree that we should > honor > > > the > > > > > > > existing > > > > > > > > > API > > > > > > > > > > > if > > > > > > > > > > > > possible. This might be as easy as providing an > > > adapter > > > > > > class > > > > > > > > > > between > > > > > > > > > > > > connect's Transformation and mirror-maker's > > Handler. > > > > > Maybe > > > > > > > > file a > > > > > > > > > > > Jira > > > > > > > > > > > > ticket to track this? > > > > > > > > > > > > > > > > > > > > > > > > Really appreciate your feedback! > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Dec 6, 2018 at 7:03 PM Michael Pearce < > > > > > > > > > > Michael.Pearce@ig.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Re hops to stop the cycle and to allow a rang= e > of > > > > multi > > > > > > > > cluster > > > > > > > > > > > > > topologies, see > > > > > > > > > > https://www.rabbitmq.com/federated-exchanges.html > > > > > > > > > > > > where > > > > > > > > > > > > > very similar was done in rabbit. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 12/7/18, 12:47 AM, "Michael Pearce" < > > > > > > > > Michael.Pearce@ig.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > Nice proposal. > > > > > > > > > > > > > > > > > > > > > > > > > > Some comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On the section around cycle detection. > > > > > > > > > > > > > > > > > > > > > > > > > > I would like to see support for this to b= e > > done > > > > by > > > > > > > hops, > > > > > > > > as > > > > > > > > > > > well > > > > > > > > > > > > e.g. > > > > > > > > > > > > > using approach is to use a header for the > number > > of > > > > > hops, > > > > > > > as > > > > > > > > > the > > > > > > > > > > > mm2 > > > > > > > > > > > > > replicates it increases the hop count and you > can > > > > make > > > > > > the > > > > > > > > mm2 > > > > > > > > > > > > configurable > > > > > > > > > > > > > to only produce messages onwards where hops a= re > > > less > > > > > than > > > > > > > x. > > > > > > > > > > > > > This then allows ring (hops =3D number of > > brokers > > > > in > > > > > > the > > > > > > > > > ring), > > > > > > > > > > > > mesh > > > > > > > > > > > > > (every cluster interconnected so hop=3D1), or > even > > a > > > > tree > > > > > > > (more > > > > > > > > > > fine > > > > > > > > > > > > grained > > > > > > > > > > > > > setup) cluster topology. > > > > > > > > > > > > > FYI we do this currently with the current > > > mirror > > > > > > maker, > > > > > > > > > > using a > > > > > > > > > > > > custom > > > > > > > > > > > > > handler. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On the section around running a standalon= e > > > > > > MirrorMaker > > > > > > > > > > > connector > > > > > > > > > > > > > > > > > > > > > > > > > > I would suggest making this as easy to ru= n > as > > > the > > > > > > > > > > mirrormakers > > > > > > > > > > > > are > > > > > > > > > > > > > today, with a simple single sh script. > > > > > > > > > > > > > I assume this is what is proposed in > section > > > > > "Running > > > > > > > > > > > > MirrorMaker in > > > > > > > > > > > > > legacy mode" but I would even do this before = MM > > > would > > > > > be > > > > > > > > > removed, > > > > > > > > > > > > with a -2 > > > > > > > > > > > > > varient. > > > > > > > > > > > > > e.g. > > > > > > > > > > > > > ./bin/kafka-mirror-maker-2.sh --consumer > > > > > > > > > consumer.properties > > > > > > > > > > > > > --producer producer.properties > > > > > > > > > > > > > > > > > > > > > > > > > > Lastly > > > > > > > > > > > > > > > > > > > > > > > > > > I see no section on providing support for > > > mirror > > > > > > maker > > > > > > > > > > > Handlers, > > > > > > > > > > > > today > > > > > > > > > > > > > people can add handlers to have a little extr= a > > > custom > > > > > > logic > > > > > > > > if > > > > > > > > > > > > needed, and > > > > > > > > > > > > > the handler api is public today so should be > > > > supported > > > > > > > going > > > > > > > > > > > > forwards so > > > > > > > > > > > > > people are not on mass re-writing these. > > > > > > > > > > > > > > > > > > > > > > > > > > On 12/5/18, 5:36 PM, "Ryanne Dolan" < > > > > > > > > ryannedolan@gmail.com > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > S=C3=B6nke, > > > > > > > > > > > > > > > > > > > > > > > > > > > The only thing that I could come up > > with > > > is > > > > > the > > > > > > > > > > > limitation > > > > > > > > > > > > to a > > > > > > > > > > > > > single > > > > > > > > > > > > > offset commit interval > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, and other internal properties, > e.g. > > > > those > > > > > > used > > > > > > > > by > > > > > > > > > > the > > > > > > > > > > > > internal > > > > > > > > > > > > > consumers and producers, which, > granted, > > > > > probably > > > > > > > are > > > > > > > > > not > > > > > > > > > > > > often > > > > > > > > > > > > > changed > > > > > > > > > > > > > from their defaults, but that apply t= o > > > > > Connectors > > > > > > > > > across > > > > > > > > > > > the > > > > > > > > > > > > > entire cluster. > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 5, 2018 at 3:21 AM S=C3= =B6nke > > Liebau > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > when you say "Currently worker > configs > > > > apply > > > > > > > across > > > > > > > > > the > > > > > > > > > > > > entire > > > > > > > > > > > > > cluster, > > > > > > > > > > > > > > which is limiting even for use-case= s > > > > > involving > > > > > > a > > > > > > > > > single > > > > > > > > > > > > Kafka > > > > > > > > > > > > > cluster.", > > > > > > > > > > > > > > may I ask you to elaborate on those > > > > > > limitations a > > > > > > > > > > little? > > > > > > > > > > > > > > The only thing that I could come up > > with > > > is > > > > > the > > > > > > > > > > > limitation > > > > > > > > > > > > to a > > > > > > > > > > > > > single > > > > > > > > > > > > > > offset commit interval value for al= l > > > > running > > > > > > > > > > connectors. > > > > > > > > > > > > > > Maybe also the limitation to shared > > > config > > > > > > > > > providers.. > > > > > > > > > > > > > > > > > > > > > > > > > > > > But you sound like you had painful > > > > > experiences > > > > > > > with > > > > > > > > > > this > > > > > > > > > > > > before, > > > > > > > > > > > > > maybe > > > > > > > > > > > > > > you'd like to share the burden :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > S=C3=B6nke > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 5, 2018 at 5:15 AM Ryan= ne > > > > Dolan < > > > > > > > > > > > > > ryannedolan@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > S=C3=B6nke, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think so long as we can keep th= e > > > > > > differences > > > > > > > > at a > > > > > > > > > > > very > > > > > > > > > > > > high > > > > > > > > > > > > > level (i.e. > > > > > > > > > > > > > > > the "control plane"), there is > little > > > > > > downside > > > > > > > to > > > > > > > > > MM2 > > > > > > > > > > > and > > > > > > > > > > > > > Connect > > > > > > > > > > > > > > > coexisting. I do expect them to > > > converge > > > > to > > > > > > > some > > > > > > > > > > > extent, > > > > > > > > > > > > with > > > > > > > > > > > > > features > > > > > > > > > > > > > > from > > > > > > > > > > > > > > > MM2 being pulled into Connect > > whenever > > > > this > > > > > > is > > > > > > > > > > possible > > > > > > > > > > > > > without breaking > > > > > > > > > > > > > > > things. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I could definitely see your idea = re > > > > > > hierarchies > > > > > > > > or > > > > > > > > > > > > groups of > > > > > > > > > > > > > connectors > > > > > > > > > > > > > > > being useful outside MM2. Current= ly > > > > "worker > > > > > > > > > configs" > > > > > > > > > > > > apply > > > > > > > > > > > > > across the > > > > > > > > > > > > > > > entire cluster, which is limiting > > even > > > > for > > > > > > > > > use-cases > > > > > > > > > > > > involving > > > > > > > > > > > > > a single > > > > > > > > > > > > > > > Kafka cluster. If Connect support= ed > > > > > multiple > > > > > > > > > workers > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > same cluster, > > > > > > > > > > > > > > > it would start to look a lot like= a > > MM2 > > > > > > > cluster. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 4, 2018 at 3:26 PM > S=C3=B6nke > > > > Liebau > > > > > > > > > > > > > > > .invalid> > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > thanks for your response! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems like you have already > > done a > > > > lot > > > > > > of > > > > > > > > > > > > investigation > > > > > > > > > > > > > into the > > > > > > > > > > > > > > > > existing code and the solution > > design > > > > and > > > > > > all > > > > > > > > of > > > > > > > > > > what > > > > > > > > > > > > you > > > > > > > > > > > > > write makes > > > > > > > > > > > > > > > sense > > > > > > > > > > > > > > > > to me. Would it potentially be > > worth > > > > > adding > > > > > > > > this > > > > > > > > > to > > > > > > > > > > > > the KIP, > > > > > > > > > > > > > now that > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > had to write it up because of m= e > > > > anyway? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, I am afraid that I am > > still > > > > not > > > > > > > > entirely > > > > > > > > > > > > convinced > > > > > > > > > > > > > of the > > > > > > > > > > > > > > > > fundamental benefit this provid= es > > > over > > > > an > > > > > > > > > extended > > > > > > > > > > > > Connect > > > > > > > > > > > > > that has the > > > > > > > > > > > > > > > > following functionality: > > > > > > > > > > > > > > > > - allow for organizing connecto= rs > > > into > > > > a > > > > > > > > > > hierarchical > > > > > > > > > > > > > structure - > > > > > > > > > > > > > > > > "clusters/us-west/..." > > > > > > > > > > > > > > > > - allow defining external Kafka > > > > clusters > > > > > to > > > > > > > be > > > > > > > > > used > > > > > > > > > > > by > > > > > > > > > > > > > Source and Sink > > > > > > > > > > > > > > > > connectors instead of the local > > > cluster > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Personally I think both of thes= e > > > > features > > > > > > are > > > > > > > > > > useful > > > > > > > > > > > > > additions to > > > > > > > > > > > > > > > Connect, > > > > > > > > > > > > > > > > I'll address both separately > below. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Allowing to structure connector= s > > in a > > > > > > > hierarchy > > > > > > > > > > > > > > > > Organizing running connectors > will > > > grow > > > > > > more > > > > > > > > > > > important > > > > > > > > > > > > as > > > > > > > > > > > > > corporate > > > > > > > > > > > > > > > > customers adapt Connect and > > > > installations > > > > > > > grow > > > > > > > > in > > > > > > > > > > > size. > > > > > > > > > > > > > Additionally > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > could be useful for ACLs in cas= e > > they > > > > are > > > > > > > ever > > > > > > > > > > added > > > > > > > > > > > to > > > > > > > > > > > > > Connect, as you > > > > > > > > > > > > > > > > could allow specific users acce= ss > > > only > > > > to > > > > > > > > > specific > > > > > > > > > > > > > namespaces (and > > > > > > > > > > > > > > until > > > > > > > > > > > > > > > > ACLs are added it would > facilitate > > > > using > > > > > a > > > > > > > > > reverse > > > > > > > > > > > > proxy for > > > > > > > > > > > > > the same > > > > > > > > > > > > > > > > effect). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Allow accessing multiple extern= al > > > > > clusters > > > > > > > > > > > > > > > > The reasoning for this feature = is > > > > pretty > > > > > > much > > > > > > > > the > > > > > > > > > > > same > > > > > > > > > > > > as > > > > > > > > > > > > > for a central > > > > > > > > > > > > > > > > Mirror Maker cluster, if a > company > > > has > > > > > > > multiple > > > > > > > > > > > > clusters for > > > > > > > > > > > > > whatever > > > > > > > > > > > > > > > > reason but wants to have ingest > > > > > centralized > > > > > > > in > > > > > > > > > one > > > > > > > > > > > > system > > > > > > > > > > > > > aka one > > > > > > > > > > > > > > Connect > > > > > > > > > > > > > > > > cluster they would need the > ability > > > to > > > > > read > > > > > > > > from > > > > > > > > > > and > > > > > > > > > > > > write > > > > > > > > > > > > > to an > > > > > > > > > > > > > > > arbitrary > > > > > > > > > > > > > > > > number of Kafka clusters. > > > > > > > > > > > > > > > > I haven't really looked at the > > code, > > > > just > > > > > > > poked > > > > > > > > > > > around > > > > > > > > > > > > a > > > > > > > > > > > > > couple of > > > > > > > > > > > > > > > minutes, > > > > > > > > > > > > > > > > but it appears like this could = be > > > done > > > > > with > > > > > > > > > fairly > > > > > > > > > > > low > > > > > > > > > > > > > effort. My > > > > > > > > > > > > > > general > > > > > > > > > > > > > > > > idea would be to leave the > existing > > > > > > > > configuration > > > > > > > > > > > > options > > > > > > > > > > > > > untouched - > > > > > > > > > > > > > > > > Connect will always need a > > "primary" > > > > > > cluster > > > > > > > > that > > > > > > > > > > is > > > > > > > > > > > > used > > > > > > > > > > > > > for storage > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > internal data (config, offsets, > > > status) > > > > > > there > > > > > > > > is > > > > > > > > > no > > > > > > > > > > > > need to > > > > > > > > > > > > > break > > > > > > > > > > > > > > > existing > > > > > > > > > > > > > > > > configs. But additionally allow > > > adding > > > > > > named > > > > > > > > > extra > > > > > > > > > > > > clusters > > > > > > > > > > > > > by > > > > > > > > > > > > > > specifying > > > > > > > > > > > > > > > > options like > > > > > > > > > > > > > > > > > > > > > > > external.sales_cluster.bootstrap_servers=3D... > > > > > > > > > > > > > > > > > > > > > > > > > external.sales_cluster.ssl.keystore.location=3D... > > > > > > > > > > > > > > > > > > > > > > > > > external.marketing_cluster.bootstrap_servers=3D... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The code for status, offset and > > > config > > > > > > > storage > > > > > > > > is > > > > > > > > > > > > mostly > > > > > > > > > > > > > isolated in > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > Kafka[Offset|Status|Config]BackingStore > > > > > > > classes > > > > > > > > > and > > > > > > > > > > > > could > > > > > > > > > > > > > remain pretty > > > > > > > > > > > > > > > > much unchanged. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Producer and consumer creation > for > > > > Tasks > > > > > is > > > > > > > > done > > > > > > > --000000000000edb5df057ee252ec--