From dev-return-94820-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Jun 5 04:00:10 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EE73F180636 for ; Tue, 5 Jun 2018 04:00:08 +0200 (CEST) Received: (qmail 22634 invoked by uid 500); 5 Jun 2018 02:00:07 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 22622 invoked by uid 99); 5 Jun 2018 02:00:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jun 2018 02:00:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 5402A1A3EBF for ; Tue, 5 Jun 2018 02:00:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.442 X-Spam-Level: X-Spam-Status: No, score=0.442 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-1.697, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id sjvAgY2v1Xpd for ; Tue, 5 Jun 2018 01:59:51 +0000 (UTC) Received: from mail-oi0-f43.google.com (mail-oi0-f43.google.com [209.85.218.43]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 4CC5E5F288 for ; Tue, 5 Jun 2018 01:59:51 +0000 (UTC) Received: by mail-oi0-f43.google.com with SMTP id l22-v6so611963oib.4 for ; Mon, 04 Jun 2018 18:59:51 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=VOGMOGthxlpPdrs5nFfvFCEQO0pmlJM7De1eFEKZWZ0=; b=r1XThnzQxBK9O1F3AJdBq/D85iKGYrakUJQjh12IK/2IKr+Yo4Bqgc2/u5+KMC8j9W LzT0lLaNKxjDgTNPsUOG7ao0pPcqkcJ9603/yC4mZ6l5mx88iWpVB3S6xqU6TUamaPdl ms35+5pEwnsw4gvziQlGjlidL5hrEutE4CGFXTfH0QCa4wQVlwRRvMs7xi933JtQbFMo yks5Bg5vtqvvEZdeeyw37C76XPIjTU3EW73egZts7GmRXw38hDDjjyaTqYZBx5wpRHfE t6McECh12XI8DII8ZWsyBwlob3kLSDqOXUiigU83R0UyxErnfhai9GuN8UQ+pLWOoExa HUiA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=VOGMOGthxlpPdrs5nFfvFCEQO0pmlJM7De1eFEKZWZ0=; b=rFxtDTBYX2O/+RKSbahRRVmZihnka1o73ngYv1v/NMh9eheGFGwSPXnG/cNrxSS6/b M4YCVPX3m1kV4YDmh+QFCXGYuFGGsJpUw0Kwem2CeiLyOR8NpgP+xPFxHfE1WdrigdrU mT62zS4ksBOOmi09QGE/lrIRm9AmnyjuY9YUt1igteC9af0HSOUBaZJJb0YWkKIsjFTd /5F3Ikgoo5iKTKCNlSyk4vOsfg3lDN30i+kFjUcUf9faCDXTeAsDg28SnKjtlHesp4LU 1tW+RuLScQ++mWoNuP22orXeG7vsF7uGA6qzRkCSuam8zBeOly+j/j8dthlYcbIC7Mr8 Uguw== X-Gm-Message-State: APt69E2TSNJgOenzQIYeyK6Lh5ehQVhvTD13aPhOIQKe7Oe6i984fAYA nhWilMZXiOnf6LguOE13l8bkNRbY0DzrGndmh7E= X-Google-Smtp-Source: ADUXVKJCtWdWnX5HNJrhX94ssFR0X79EPxbFRQiRRNkPWQxaybziizb6T22sy29DMKj8FsNJj3a+EU9IzcaXW9p2PxU= X-Received: by 2002:aca:c141:: with SMTP id r62-v6mr5010307oif.68.1528163990044; Mon, 04 Jun 2018 18:59:50 -0700 (PDT) MIME-Version: 1.0 Received: by 2002:ac9:39ae:0:0:0:0:0 with HTTP; Mon, 4 Jun 2018 18:59:29 -0700 (PDT) In-Reply-To: References: <5AA3A803.2040302@trivago.com> <5AA76DEB.8040601@trivago.com> <5AAA28F5.5060108@trivago.com> <5AB264DB.1080507@trivago.com> <594f2ac5-5b95-e729-3d8d-d64b02023d05@apache.org> From: Dong Lin Date: Mon, 4 Jun 2018 18:59:29 -0700 Message-ID: Subject: Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="000000000000c90557056ddb6785" --000000000000c90557056ddb6785 Content-Type: text/plain; charset="UTF-8" Hi all, Not sure if there is further concern with this KIP. And the vote for KIP-232 is also blocked on the discussion of this KIP. I am happy to continue the discussion and move forward the progress of this KIP. Thanks, Dong On Tue, Apr 17, 2018 at 8:58 PM, Jeff Chao wrote: > Hi Dong, I took a look at KIP-287. Producers writing using the new scheme > prior to processor catch up and cut-over makes sense. Thanks. > > On Sat, Apr 14, 2018 at 7:09 PM, Dong Lin wrote: > > > Hey Jeff, > > > > Thanks for the review. The scheme for expanding processors of the > stateful > > processing job is described in "Support processor expansion" section in > > KIP-287 (link > > > 287%3A+Support+partition+and+processor+expansion+for+ > > stateful+processing+jobs#KIP-287:Supportpartitionandprocessorex > > pansionforstatefulprocessingjobs-Supportprocessorexpansion>). > > In particular, we will expand the partitions of the input topics before > > expanding processors of the processing job. While the new processor is > > catching up, the producer would already be producing using the new > > partitioning scheme. Does this answer your question? > > > > Thanks, > > Dong > > > > > > On Sat, Apr 14, 2018 at 1:44 PM, Jeff Chao wrote: > > > > > Hi, I had a question from Monday's meeting. The current mechanism, as > > Ray's > > > notes points out, is to create a copy consumer in which a switch over > > > happens when it catches up. Meanwhile, it looks like a producer would > > still > > > be writing using the old partitioning scheme. Wouldn't there be a case > > > where the copy consumer never catches up given a high enough produce > > rate? > > > I'm imagining this to work similarly to Kinesis's resharding mechanism, > > > though I may be missing something from the current proposal. Anyway, > with > > > Kinesis as an example, this would mean a producer would stop writing > > using > > > the old scheme and start writing using the new scheme. That way, the > > > consumer will be able to catch up in which it will start consuming > using > > > the new scheme afterwards. > > > > > > Thanks, > > > Jeff Chao > > > > > > On Sat, Apr 14, 2018 at 6:44 AM, Dong Lin wrote: > > > > > > > Thanks for the notes by Jun and Ray. I have read through the notes. > It > > > > seems that there are a few questions for the alternative solution by > > Jan > > > > and maybe Jan will answer these questions later? > > > > > > > > I have summarized the solution, which I previously provided in this > > > thread, > > > > in KIP-287, to hopefully show that we may have a promising direction > to > > > > solve the partition and processor expansion problem for stateful > > > processing > > > > jobs. Maybe we can continue the discussion related to stateful > > processing > > > > in that discussion thread. If that looks good, maybe we can conclude > > the > > > > discussion for KIP-253. > > > > > > > > Not sure if I missed some ideas or questions in the notes. Is there > > > > specific concern for the latest KIP-253? > > > > > > > > > > > > On Mon, Apr 9, 2018 at 11:18 AM, Ray Chiang > > wrote: > > > > > > > > > My notes from today's meeting. Sorry if I got anyone's name wrong. > > > Plus > > > > I > > > > > missed a few moments with noise at home and/or dropped video. > > > > > > > > > > -Ray > > > > > > > > > > ===== > > > > > > > > > > KIP-253 Discussion > > > > > > > > > > - Currently, adding partitions can cause keys to be read > > out-of-order. > > > > > This KIP is trying to preserve the key ordering when adding > > > partitions. > > > > > > > > > > - State management in applications (i.e. Kafka Streams) can > maintain > > > > > local state via caching. If the number of partitions changes, > how > > > > > would those applications update their local state. This is the > > > current > > > > > point of discussion/disagreement. > > > > > > > > > > - Jan Filipiak is mostly worried about log compacted topics. Not > as > > > > > concerned about producer swapping. Worried about the consumer > > design > > > > is > > > > > a bit contradictory compared to the architecture. > > > > > > > > > > Current design is to start up a new consumer in parallel with old > > > > > topic/consumer. Run until consumer finishes "copying" to the new > > > > topic. > > > > > Once the consumer is caught up, point the producer at the new > > topic. > > > > > > > > > > Would like to have this technique as a "core primitive" to Kafka. > > > > > - Is this a useful primitive? > > > > > - What's the best way to support it? > > > > > > > > > > - Topic expansion as it currently exists just "adds partitions". > > But > > > > > how does this affect bootstrapping applications? How to deal > > with > > > > > "moved" (from "old partition" to "new expanded partition") > keys? > > > > > > > > > > - Dong's proposal example. 10 partitions growing to 15. 5 of > the > > > > > first 10 partitions are split into 2 each. Say Kafka remembers > > > > > parent->child relationship. Then for each parent partition, > > there > > > > > are two child partitions. Initially, there were 10 states to > > > > > manage. Then bootstrapping new application would have 15 > states. > > > > > Need to know which "generation" of partition you are consuming > > > > > from. Until you get to "newer" generation of data, then the > keys > > > > > will be find (i.e. reading from old partition). > > > > > > > > > > - Scheme works well for transient events. Any stateful processor > > > will > > > > > likely break. > > > > > > > > > > - Tracking can become extremely complicated, since each split > > > requires > > > > > potentially more and more offset/partition combos. > > > > > > > > > > - Need to support continuation for consumers to read the new > > > > partitions. > > > > > > > > > > - With linear hashing, integral multiple increase (2x, 3x, 4x, > > etc). > > > > > Easier mapping from old partition sets to new partition sets. > > > > > Keys end up with a clean hierarchy instead of a major > > reshuffling. > > > > > > > > > > - Dong's approach piggyback on existing leader epoch. Log > segment > > > > > could be tagged with version in linear hashing case. > > > > > > > > > > - In Jan's case, existing consumers bootstrap from the beginning. > > > > > > > > > > - James' use case. Using Kafka as a long term persistent data > > store. > > > > > Putting "source of truth" information into Kafka. Bootstrap > case > > > > > is very important. New applications could be bootstrapping as > > they > > > > > come up. > > > > > > > > > > - Increasing partitions will help with load from prodcuer and > > > > > increasing consumer parallelism. > > > > > - How does Kinesis handling partition splits? They don't have > > > > > compacted logs, so no issue with bootstrapping. Kinesis uses > > > > > MD5 and splits results based on md5sum into bucket ranges. > > > > > - Is it useful for the server to know the partitioning > function? > > > > > Consumer has some implicit assumptions about keyed > partitions, > > > > > but not strongly enforced on server side. > > > > > > > > > > - KIP-213 (one to many joins in Kafka Streams) > > > > > > > > > > - MySQL case. Primary key forced to be used as Kafka key. > > > > > > > > > > (Sorry had some audio and video drop at this point) > > > > > > > > > > - Mirror Maker. Source cluster has custom partitioning > > function. > > > > > Producer won't duplicate to same partitioning setup as > > source. > > > > > Need to provide same partitioning function to producer. > > > > > Would need to determine partitioning function based on > topic. > > > > > > > > > > - Should server validate partitioning? > > > > > - Who does actual determination of which key goes to which > > > > > partition. > > > > > > > > > > - How to implement backfill? > > > > > > > > > > - Who will do it? In producer? Hard to do. Every client > > > would > > > > > need to add this functionality. Better to do on server > > side. > > > > > - Add a type of "copy consumer"? Add backoff to producer? > > > > > Benefit of doing in consumer vs. producer? > > > > > > > > > > - Still TBD > > > > > - How to dedupe control messages? > > > > > - How to deal with subtle cases during transition? > > > > > - Is it useful for the server to have the option to validate > the > > > key > > > > > distribution? > > > > > - Jan concerned about how a consumer application would look > with > > > the > > > > > new "split partition" design. > > > > > - KIP introduced callback. Jan doesn't think is useful. > Callback > > > > > for switching "between Partition 1 and can start on Partition > > > 11". > > > > > Rely on marker in Partition 1 instead. Intent for callback > is > > > > > for possibility that delivery of messages for given key is > > moved > > > > > to a different consumer instance. > > > > > > > > > > > > > > > > > > > > On 4/6/18 9:44 AM, Dong Lin wrote: > > > > > > > > > >> Hey John, > > > > >> > > > > >> Thanks much for your super-detailed explanation. This is very > > helpful. > > > > >> > > > > >> Now that I have finished reading through your email, I think the > > > > proposed > > > > >> solution in my previous email probably meets the requirement #6 > > > without > > > > >> requiring additional coordination (w.r.t. partition function) > among > > > > >> clients. My understanding of requirement #6 is that, after > partition > > > > >> expansion, messages with the given key will go to the same > consumer > > > > before > > > > >> and after the partition expansion such that stream processing jobs > > > won't > > > > >> be > > > > >> affected. Thus this approach seems to be better than backfilling > > since > > > > it > > > > >> does not require data copy for input topics. > > > > >> > > > > >> In order for the proposed solution to meet requirements #6, we > need > > > two > > > > >> extra requirements in addition to what has been described in the > > > > previous > > > > >> email: 1) stream processing job starts with the same number of > > > > processors > > > > >> as the initial number of partitions of the input topics; and 2) at > > any > > > > >> given time the number of partitions of the input topic >= the > number > > > of > > > > >> processors of the given stream processing job. > > > > >> > > > > >> Could you take a look at the proposed solution and see if any of > the > > > > >> claims > > > > >> above is false? > > > > >> > > > > >> > > > > >> Hey Jan, > > > > >> > > > > >> Maybe it is more efficient for us to discuss your concern in the > KIP > > > > >> Meeting. > > > > >> > > > > >> > > > > >> Thanks, > > > > >> Dong > > > > >> > > > > >> > > > > >> On Thu, Mar 29, 2018 at 2:05 PM, John Roesler > > > > wrote: > > > > >> > > > > >> Hi Jun, > > > > >>> > > > > >>> Thanks for the response. I'm very new to this project, but I will > > > share > > > > >>> my > > > > >>> perspective. I'm going to say a bunch of stuff that I know you > know > > > > >>> already, but just so we're on the same page... > > > > >>> > > > > >>> This may also be a good time to get feedback from the other > > KStreams > > > > >>> folks. > > > > >>> > > > > >>> Using KStreams as a reference implementation for how stream > > > processing > > > > >>> frameworks may interact with Kafka, I think it's important to > > eschew > > > > >>> knowledge about how KStreams currently handles internal > > > communication, > > > > >>> making state durable, etc. Both because these details may change, > > and > > > > >>> because they won't be shared with other stream processors. > > > > >>> > > > > >>> ================================= > > > > >>> Background > > > > >>> > > > > >>> We are looking at a picture like this: > > > > >>> > > > > >>> input input input > > > > >>> \ | / > > > > >>> +-------------+ > > > > >>> +-----+ Consumer(s) +-------+ > > > > >>> | +-------------+ | > > > > >>> | | > > > > >>> | KStreams Application | > > > > >>> | | > > > > >>> | +-------------+ | > > > > >>> +-----+ Producer(s) +-------+ > > > > >>> +-------------+ > > > > >>> / \ > > > > >>> output output > > > > >>> > > > > >>> The inputs and outputs are Kafka topics (and therefore have 1 or > > more > > > > >>> partitions). We'd have at least 1 input and 0 or more outputs. > The > > > > >>> Consumers and Producers are both the official KafkaConsumer and > > > > >>> KafkaProducer. > > > > >>> > > > > >>> In general, we'll assume that the input topics are provided by > > actors > > > > >>> over > > > > >>> which we have no control, although we may as well assume they are > > > > >>> friendly > > > > >>> and amenable to requests, and also that their promises are > > > trustworthy. > > > > >>> This is important because we must depend on them to uphold some > > > > promises: > > > > >>> * That they tell us the schema of the data they publish, and > abide > > by > > > > >>> that > > > > >>> schema. Without this, the inputs are essentially garbage. > > > > >>> * That they tell us some defining characteristics of the topics > > (more > > > > on > > > > >>> this in a sec.) and again strictly abide by that promise. > > > > >>> > > > > >>> What are the topic characteristics we care about? > > > > >>> 1. The name (or name pattern) > > > > >>> 2. How the messages are keyed (if at all) > > > > >>> 3. Whether the message timestamps are meaningful, and if so, what > > > their > > > > >>> meaning is > > > > >>> 4. Assuming the records have identity, whether the partitions > > > partition > > > > >>> the > > > > >>> records' identity space > > > > >>> 5. Whether the topic completely contains the data set > > > > >>> 6. Whether the messages in the topic are ordered > > > > >>> > > > > >>> #1 is obvious: without this information, we cannot access the > data > > at > > > > >>> all. > > > > >>> > > > > >>> For #2, #3, #4, and #6, we may or may not need this information, > > > > >>> depending > > > > >>> on the logic of the application. For example, a trivial > application > > > > that > > > > >>> simply counts all events it sees doesn't care about #2, #3, #4, > or > > > #6. > > > > >>> But > > > > >>> an application that groups by some attribute can take advantage > of > > #2 > > > > and > > > > >>> #4 if the topic data is already keyed and partitioned over that > > > > >>> attribute. > > > > >>> Likewise, if the application includes some temporal semantics on > a > > > > >>> temporal > > > > >>> dimension that is already captured in #3, it may take advantage > of > > > that > > > > >>> fact. > > > > >>> > > > > >>> Note that #2, #3, #4, and #6 are all optional. If they are not > > > > promised, > > > > >>> we > > > > >>> can do extra work inside the application to accomplish what we > > need. > > > > >>> However, if they are promised (and if we depend on that promise), > > it > > > is > > > > >>> essential that the topic providers uphold those promises, as we > may > > > not > > > > >>> be > > > > >>> in a position to verify them. > > > > >>> > > > > >>> Note also that if they make a promise, but it doesn't happen to > > line > > > up > > > > >>> with our needs (data is keyed by attr1, but we need it by attr2, > or > > > > >>> timestamp is produce-time, but we need it by event-time, etc.), > > then > > > we > > > > >>> will have to go ahead and do that extra work internally anyway. > > This > > > > also > > > > >>> captures the situation in which two inputs are produced by > > different > > > > >>> providers, one of which meets our needs, and the other does not. > > The > > > > fact > > > > >>> that we can cope with this situation is the basis for my > statement > > > that > > > > >>> we > > > > >>> do not require coordination among producers. > > > > >>> > > > > >>> (Key Point A): In terms of optimization, #4 and #6 are the most > > > > valuable. > > > > >>> If these characteristics happen to line up with our needs, then > > > > KStreams > > > > >>> can be incredibly efficient in both time and computational > > resources. > > > > >>> > > > > >>> #5 is similar to knowing the schema in that it tells us whether > > the > > > > >>> computation we want to do is possible or not. For example, > suppose > > we > > > > >>> have > > > > >>> a topic of "users", and we want to construct a table for > querying. > > If > > > > the > > > > >>> user topic doesn't completely contain the dataset, we cannot > > > construct > > > > >>> the > > > > >>> table. Note that it doesn't matter whether the topic is compacted > > or > > > > not. > > > > >>> If the topic is complete, I can consume it starting at "earliest" > > and > > > > >>> build > > > > >>> my table. If it is not complete, I can do other computations on > it. > > > In > > > > >>> both > > > > >>> cases, it may or may not be compacted; it just doesn't matter. > > > > >>> > > > > >>> On the output side, the roles are reversed. We provide (or not) > > > exactly > > > > >>> the > > > > >>> same set of guarantees to consumers of our outputs, and we > likewise > > > > must > > > > >>> abide by the promises we make. > > > > >>> > > > > >>> > > > > >>> ================================= > > > > >>> Partition Expansion > > > > >>> > > > > >>> With this formation in place, let's talk about partition > expansion. > > > > >>> > > > > >>> Why do we have partitions in the first place? (let me know if I > > miss > > > > >>> something here) > > > > >>> * For logical data streams that are themselves partitionable, it > > > allows > > > > >>> producers to operate concurrently without coordination. For > > example, > > > > >>> streaming data from a sensor in a particle accelerator, the > sensor > > > can > > > > be > > > > >>> subdivided into a grid and each grid square can produce > > independently > > > > to > > > > >>> a > > > > >>> different topic. This may be valuable because the total rate of > > data > > > > >>> exceeds the throughput to a single broker node or just because it > > > > allows > > > > >>> for failure of a single producer to cause the loss of only part > of > > > the > > > > >>> data. > > > > >>> * The brokers can offer linearly scaling throughput on the number > > of > > > > >>> partitions by hosting each partition on a separate broker node > > > > >>> * The brokers can host topics that are too large to fit on a > single > > > > >>> broker's storage by hosting some partitions on separate broker > > nodes > > > > >>> * In cases where the use case permits handling partitions > > > > independently, > > > > >>> consumers can have algorithmic simplicity by processing the data > > for > > > > >>> separate partitions in separate threads, avoiding costly and > > > > error-prone > > > > >>> concurrent coordination code > > > > >>> * In cases where the use case permits handling partitions > > > > independently, > > > > >>> consumers can exceed the total throughput of a single > > broker-consumer > > > > >>> pair > > > > >>> * Just to throw this in as well, in cases where some network > links > > > are > > > > >>> less > > > > >>> costly than others (or lower latency or more reliable), such as > > when > > > > >>> brokers, producers, and consumers are running in racks: producer > > and > > > > >>> consumers can both benefit (independently) by locating work on > each > > > > >>> partition in the same rack as the broker hosting that partition. > > > > >>> > > > > >>> In other words, we have three actors in this system: producers, > > > > brokers, > > > > >>> and consumers, and they all benefit from partitioning for > different > > > > (but > > > > >>> sometimes related) reasons. > > > > >>> > > > > >>> This leads naturally to the conclusion that any of these actors > may > > > > find > > > > >>> themselves in a sub-optimal or even dangerous situation in which > > > > >>> partition > > > > >>> expansion would be the solution. For example, the producer may > find > > > > that > > > > >>> the existing throughput to the brokers is insufficient to match > the > > > > data > > > > >>> rate, forcing them to drop data. Or a broker hosting a single > > > partition > > > > >>> may > > > > >>> be running out of disk space. Or a consumer node handling a > single > > > > >>> partition cannot match the rate of production for that partition, > > > > causing > > > > >>> it to fall behind. > > > > >>> > > > > >>> I think it's reasonable to assume that all the actors in the > system > > > > can't > > > > >>> just arbitrarily expand a topic's partition. I think it's > > reasonable > > > to > > > > >>> align this responsibility with the provider of the data, namely > the > > > > >>> producer (the logical producer, not the KafkaProducer class). > > > > Therefore, > > > > >>> the producer who finds themselves in trouble can unilaterally > > expand > > > > >>> partitions to solve their problem. > > > > >>> > > > > >>> For the broker or a consumer in trouble, they have only one > resort: > > > to > > > > >>> request the producer to expand partitions. This is where it's > > helpful > > > > to > > > > >>> assume the producer is friendly. > > > > >>> > > > > >>> > > > > >>> Now, let's look at how a KStreams application fits into this > > > scenario. > > > > >>> > > > > >>> (Key Point B1): As a consumer, we may find that the producer > > expands > > > > the > > > > >>> partitions of a topic, either for their own benefit or for the > > > brokers. > > > > >>> In > > > > >>> this situation, the expand operation MUST NOT violate any > promises > > > that > > > > >>> have previously been made to us. This is the essence of KIP-253, > to > > > > >>> ensure > > > > >>> the maintenance of promises #6 and #4. It would be great if the > > > > mechanics > > > > >>> of the expansion required no major disruption to processing or > > human > > > > >>> intervention. > > > > >>> > > > > >>> Specifically, let's say that input partition X splits into X1 and > > X2. > > > > #6 > > > > >>> requires that the same old ordering guarantees of Kafka continue > to > > > > hold. > > > > >>> Obviously, this is what KIP-253's title is about. #4 requires > that > > we > > > > >>> either ensure that X1 and X2 are assigned to the same thread that > > was > > > > >>> previously assigned X OR that we immediately pause processing and > > > split > > > > >>> any > > > > >>> state such that it appears X1 and X2 were *always* separate > > > partitions. > > > > >>> > > > > >>> In other words, Option 1 is we treat X1 and X2 as still logically > > one > > > > >>> partition, equal to X. This is ideal, since in this scenario, > > > > partitions > > > > >>> are expanding for external reasons. We don't need to expand our > > > > >>> processing > > > > >>> to match. Option 2 requires a major disruption, since we'd have > to > > > > pause > > > > >>> processing while we split our state. Clearly, KStreams or any > other > > > > >>> stateful consumer would advocate for Option 1. > > > > >>> > > > > >>> > > > > >>> (Corollary to Key Point A): Still on the consumer side, we may > find > > > > that > > > > >>> we > > > > >>> ourselves can benefit from partition expansion of an input. Since > > we > > > > can > > > > >>> cope with the absence of promise #4, partition expansion is not a > > > hard > > > > >>> requirement for us, but assuming we were already benefiting from > > the > > > > >>> major > > > > >>> performance optimizations afforded by #4, it would be nice to be > > able > > > > to > > > > >>> request the producer satisfy our request for partition expansion > > > **and > > > > to > > > > >>> be able to benefit from it**. > > > > >>> > > > > >>> What does it mean to be able to benefit from partition expansion? > > > > >>> Assuming > > > > >>> input topic partition X splits into X1 and X2, in this scenario, > we > > > > >>> *would* > > > > >>> wish to be able to split our state such that it appears X1 and X2 > > > were > > > > >>> *always* separate partitions. Of course, the conclusion of Key > > Point > > > B1 > > > > >>> still applies: we should be able to continue operating on (X1+X2 > = > > X) > > > > as > > > > >>> one partition while asynchronously building the state of X1 and > X2 > > > > >>> separately. > > > > >>> > > > > >>> When it comes to the mechanics of building the state of X1 and X2 > > > > >>> separately, we have really just two high-level options. Either > this > > > > >>> problem > > > > >>> is solved by Kafka itself, giving me a view in which X1 and X2 > were > > > > >>> always > > > > >>> separate partitions, or I have to do it myself. The latter means > > > that I > > > > >>> have to take on substantially more complexity than I do today: > > > > >>> Bummer 1: My state has to be splittable to begin with, implying > at > > > the > > > > >>> least that I need to be able to scan every record in my state, a > > > > >>> requirement that otherwise does not exist. > > > > >>> Bummer 2: After splitting the state of X1 and X2, I need to be > > able > > > to > > > > >>> send at least one of those tasks, state included, to another > > > > application > > > > >>> node (in order to realize the benefit of the split). This is > also a > > > > >>> requirement that does not currently exist. > > > > >>> Bummer 3: In order to actually perform the split, I must know and > > be > > > > able > > > > >>> to execute the exact same partition function the producer of my > > topic > > > > >>> uses. > > > > >>> This introduces a brand-new a priori commitment from my input > > > > producers: > > > > >>> (would-be #7: convey the partition function and abide by it). > This > > > is a > > > > >>> big > > > > >>> restriction over #4, which only requires them to guarantee *that > > > there > > > > >>> is a > > > > >>> partition function*. Now they actually have to share the function > > > with > > > > >>> me. > > > > >>> And I have to be able to implement and execute it myself. And if > > the > > > > >>> producer wishes to refine the partition function for an existing > > > topic, > > > > >>> we > > > > >>> have another round of coordination, as they have to be sure that > I, > > > and > > > > >>> all > > > > >>> other consumers, begin using the new function *before* they do. > > This > > > is > > > > >>> similar to the schema problem, with a similar set of solutions. > We > > > > would > > > > >>> likely need a partition function registry and another magic byte > on > > > > every > > > > >>> record to be sure we do this right. Not to mention some way to > > > express > > > > >>> arbitrary partitioning logic over arbitrary data in a way that is > > > > >>> portable > > > > >>> across programming languages. > > > > >>> > > > > >>> Alternatively, if Kafka gives me a view in which X1 and X2 were > > > always > > > > >>> separate, then I can create tasks for X1 and X2 and allow them to > > > > >>> bootstrap > > > > >>> while I continue to process X. Once they are ready, I can > > coordinate > > > a > > > > >>> transition to stop X's task and switch to X1 and X2. None of > those > > > > >>> bummers > > > > >>> are present, so this is a significantly better option for me. > > > > >>> > > > > >>> (Key Point B2): As a (friendly) producer, we may once again want > on > > > our > > > > >>> own > > > > >>> to expand partitions, or we may want to satisfy a request from > the > > > > broker > > > > >>> or our consumers to do so. Again, we MUST NOT violate any > promises > > we > > > > >>> have > > > > >>> previously given, and it would be great if the expansion required > > no > > > > >>> major > > > > >>> disruption to processing or human intervention. Additionally, > since > > > we > > > > >>> are > > > > >>> actually driving the expansion, it would also be great if we > could > > > > avoid > > > > >>> Bummer 3's coordination problems from the producer's perspective. > > > > >>> > > > > >>> > > > > >>> ====================================== > > > > >>> Briefly: KStreams internals > > > > >>> > > > > >>> I'm pretty sure you were asking me to comment on the > implementation > > > > >>> details > > > > >>> of KStreams, so I'll say a few words about it. The most important > > > thing > > > > >>> is > > > > >>> that KStreams is still very early in its development. Maybe > > > > "early-middle > > > > >>> maturity" is a good way to put it. We are actively discussing > > > > >>> more-or-less > > > > >>> major implementation changes to improve performance, footprint, > > > > >>> scalability, and ergonomics. So it may actually be misleading to > > > > discuss > > > > >>> deeply how KStreams internally uses Kafka topics. > > > > >>> > > > > >>> Nevertheless, it is currently true that KStreams uses Kafka > topics > > > for > > > > >>> communication between some internal computation nodes. We > partition > > > > these > > > > >>> topics as the base unit of concurrency granularity, so it would > > > > >>> potentially > > > > >>> be beneficial to be able to expand partitions for some of these > > > > internal > > > > >>> topics at some point. However, we can alternatively just > > > overpartition > > > > >>> these internal topics, creating in the low 100s of partitions > > instead > > > > of > > > > >>> the low 10s, for example. (Side note: if Kafka were someday to > > > support > > > > >>> higher numbers of partitions, we could expand this scheme to > > > > >>> overpartition > > > > >>> in the 1000s of partitions.) With the option to overpartition, we > > > don't > > > > >>> have a strong need for partition expansion internally. > > > > >>> > > > > >>> It is also currently true that KStreams uses Kafka to store a > > durable > > > > >>> changelog for some of our internal state stores. But we *only* > read > > > > from > > > > >>> this topic *if* we need to restore a state store after node loss > > (or > > > to > > > > >>> maintain a hot mirror of the state store), so I think it's > unlikely > > > > that > > > > >>> we > > > > >>> would ever make use of partition expansion on the changelog > topics. > > > > >>> > > > > >>> But once again, I'd like to emphasize that we may choose an > > > alternative > > > > >>> implementation for either interprocess communication or state > > > > durability. > > > > >>> > > > > >>> > > > > >>> ====================================== > > > > >>> Concluding thoughts > > > > >>> > > > > >>> I know this is a very long email, and I really appreciate you > > > sticking > > > > >>> with > > > > >>> me this long. I hope it was useful for syncing our mental picture > > of > > > > this > > > > >>> system. Also, you're far more knowledgeable than I am about this > > > system > > > > >>> and > > > > >>> this domain, so please correct me if I've said anything wrong. > > > > >>> > > > > >>> To me the key takeaways are that: > > > > >>> - KIP-253 satisfies all we need for correctness, since it > contains > > > > >>> solutions to guarantee producers can abide by their promises > w.r.t. > > > #4 > > > > >>> and > > > > >>> #6. > > > > >>> - From Key Point A: #4 is actually optional for KIP-253, but > > without > > > > it, > > > > >>> we > > > > >>> lose a potentially valuable optimization in KStreams (and all > other > > > > >>> consumer applications) > > > > >>> - From Corollary to Point A: Without low-level support for > > partition > > > > >>> expansion with backfill, we cannot employ requesting partition > > > > expansion > > > > >>> as > > > > >>> a consumer to improve application performance. In that case, to > > > ensure > > > > >>> performance scalability, we would have to discard for all > KStreams > > > > >>> applications the performance optimization afforded by #4. > > > > >>> - From Key Point B1: After a partition split, we really need to > be > > > able > > > > >>> to > > > > >>> seamlessly continue operating as if it had not split. > > > > >>> - From Key Point B2: Since KIP-253 allows us to maintain all our > > > > >>> promises, > > > > >>> we have the option of expanding partitions in the topics we > > produce. > > > > >>> Without a backfill operation, though, our consumers may not be > able > > > to > > > > >>> realize the benefits of that split, if they were hoping to. > > > > >>> > > > > >>> In general, faced with the possibility of having to coordinate > the > > > > >>> partition function with our inputs' producers or with our > outputs' > > > > >>> consumers, I would personally lean toward overprovisioning and > > > > completely > > > > >>> avoiding resize for our use case. This doesn't mean that it's not > > > > useful > > > > >>> in > > > > >>> the ecosystem at large without backfill, just that it loses its > > > luster > > > > >>> for > > > > >>> me. It also means that we can no longer take advantage of some of > > our > > > > >>> current optimizations, and in fact that we must introduce an > extra > > > hop > > > > of > > > > >>> repartitioning on every single input. > > > > >>> > > > > >>> I think this is actually a pretty good picture of the > opportunities > > > and > > > > >>> challenges that other consumers and producers in the Kafka > > ecosystem > > > > will > > > > >>> face. > > > > >>> > > > > >>> I hope this helps! > > > > >>> > > > > >>> Thanks, > > > > >>> -John > > > > >>> > > > > >>> On Wed, Mar 28, 2018 at 11:51 AM, Jun Rao > > wrote: > > > > >>> > > > > >>> Hi, John, > > > > >>>> > > > > >>>> I actually think it's important to think through how KStreams > > > handles > > > > >>>> partition expansion in this KIP. If we do decide that we truly > > need > > > > >>>> backfilling, it's much better to think through how to add it > now, > > > > >>>> instead > > > > >>>> of retrofitting it later. It would be useful to outline how both > > > > >>>> existing > > > > >>>> KStreams jobs and new KStreams jobs work to see if backfilling > is > > > > really > > > > >>>> needed. > > > > >>>> > > > > >>>> If we can figure out how KStreams works, at least we have one > > > > reference > > > > >>>> implementation for other stream processing frameworks that face > > the > > > > same > > > > >>>> issue. > > > > >>>> > > > > >>>> Thanks, > > > > >>>> > > > > >>>> Jun > > > > >>>> > > > > >>>> > > > > >>>> On Tue, Mar 27, 2018 at 4:56 PM, John Roesler < > john@confluent.io> > > > > >>>> wrote: > > > > >>>> > > > > >>>> Hi Jun, > > > > >>>>> > > > > >>>>> That's a good point. > > > > >>>>> > > > > >>>>> Yeah, I don't think it would work too well for existing > consumers > > > in > > > > >>>>> > > > > >>>> the > > > > >>> > > > > >>>> middle of gen 0 to try and switch to a newly backfilled prefix > of > > > gen > > > > >>>>> > > > > >>>> 1. > > > > >>> > > > > >>>> They probably just need to finish up until they get to the end > of > > > gen > > > > 0 > > > > >>>>> > > > > >>>> and > > > > >>>> > > > > >>>>> transition just as if there were no backfill available yet. > > > > >>>>> > > > > >>>>> This isn't terrible, since consumer applications that care > about > > > > >>>>> > > > > >>>> scaling > > > > >>> > > > > >>>> up > > > > >>>> > > > > >>>>> to match a freshly split partition would wait until after the > > > > backfill > > > > >>>>> > > > > >>>> is > > > > >>> > > > > >>>> available to scale up. The consumer that starts out in gen=0, > > part=0 > > > > is > > > > >>>>> going to be stuck with part=0 and part=3 in gen=1 in my example > > > > >>>>> > > > > >>>> regardless > > > > >>>> > > > > >>>>> of whether they finish scanning gen=0 before or after the > > backfill > > > is > > > > >>>>> available. > > > > >>>>> > > > > >>>>> The broker knowing when it's ok to delete gen 0, including its > > > offset > > > > >>>>> mappings, is a big issue, though. I don't have any immediate > > ideas > > > > for > > > > >>>>> solving it, but it doesn't feel impossible. Hopefully, you > agree > > > this > > > > >>>>> > > > > >>>> is > > > > >>> > > > > >>>> outside of KIP-253's scope, so maybe we don't need to worry > about > > it > > > > >>>>> > > > > >>>> right > > > > >>>> > > > > >>>>> now. > > > > >>>>> > > > > >>>>> I do agree that reshuffling in KStreams effectively solves the > > > > >>>>> > > > > >>>> scalability > > > > >>>> > > > > >>>>> problem as well, as it decouples the partition count (and the > > > > partition > > > > >>>>> scheme) upstream from the parallelism of the streams > application. > > > > >>>>> > > > > >>>> Likely, > > > > >>> > > > > >>>> we will do this in any case. I'm predominantly advocating for > > > > follow-on > > > > >>>>> work to enable backfill for the *other* Kafka users that are > not > > > > >>>>> > > > > >>>> KStreams. > > > > >>>> > > > > >>>>> Thanks for your consideration, > > > > >>>>> -John > > > > >>>>> > > > > >>>>> On Tue, Mar 27, 2018 at 6:19 PM, Jun Rao > > wrote: > > > > >>>>> > > > > >>>>> Hi, John, > > > > >>>>>> > > > > >>>>>> Thanks for the reply. I agree that the backfill approach works > > > > >>>>>> > > > > >>>>> cleaner > > > > >>> > > > > >>>> for > > > > >>>>> > > > > >>>>>> newly started consumers. I am just not sure if it's a good > > > primitive > > > > >>>>>> > > > > >>>>> to > > > > >>> > > > > >>>> support for existing consumers. One of the challenges that I see > > is > > > > >>>>>> > > > > >>>>> the > > > > >>> > > > > >>>> remapping of the offsets. In your approach, we need to copy the > > > > >>>>>> > > > > >>>>> existing > > > > >>>> > > > > >>>>> records from the partitions in generation 0 to generation 1. > > Those > > > > >>>>>> > > > > >>>>> records > > > > >>>>> > > > > >>>>>> will get different offsets in the new generation. The broker > > will > > > > >>>>>> > > > > >>>>> have > > > > >>> > > > > >>>> to > > > > >>>> > > > > >>>>> store those offset mappings somewhere. When the backfill > > completes, > > > > >>>>>> > > > > >>>>> you > > > > >>> > > > > >>>> can > > > > >>>>> > > > > >>>>>> delete generation 0's data. However, the broker can't throw > away > > > the > > > > >>>>>> > > > > >>>>> offset > > > > >>>>> > > > > >>>>>> mappings immediately since it doesn't know if there is any > > > existing > > > > >>>>>> consumer still consuming generation 0's records. In a > compacted > > > > >>>>>> > > > > >>>>> topic, > > > > >>> > > > > >>>> the > > > > >>>>> > > > > >>>>>> broker probably can only safely remove the offset mappings > when > > > all > > > > >>>>>> > > > > >>>>> records > > > > >>>>> > > > > >>>>>> in generation 0 are removed by the cleaner. This may never > > happen > > > > >>>>>> > > > > >>>>> though. > > > > >>>> > > > > >>>>> If we reshuffle the input inside a KStreams job, it obviates > the > > > need > > > > >>>>>> > > > > >>>>> for > > > > >>>> > > > > >>>>> offset remapping on the broker. > > > > >>>>>> > > > > >>>>>> Jun > > > > >>>>>> > > > > >>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler < > > john@confluent.io > > > > > > > > >>>>>> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>> Hey Dong and Jun, > > > > >>>>>>> > > > > >>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll > > mix > > > my > > > > >>>>>>> > > > > >>>>>> replies > > > > >>>>>> > > > > >>>>>>> together to try for a coherent response. I'm not too familiar > > > with > > > > >>>>>>> mailing-list etiquette, though. > > > > >>>>>>> > > > > >>>>>>> I'm going to keep numbering my points because it makes it > easy > > > for > > > > >>>>>>> > > > > >>>>>> you > > > > >>>> > > > > >>>>> all > > > > >>>>>> > > > > >>>>>>> to respond. > > > > >>>>>>> > > > > >>>>>>> Point 1: > > > > >>>>>>> As I read it, KIP-253 is *just* about properly fencing the > > > > >>>>>>> > > > > >>>>>> producers > > > > >>> > > > > >>>> and > > > > >>>>> > > > > >>>>>> consumers so that you preserve the correct ordering of records > > > > >>>>>>> > > > > >>>>>> during > > > > >>> > > > > >>>> partition expansion. This is clearly necessary regardless of > > > > >>>>>>> > > > > >>>>>> anything > > > > >>> > > > > >>>> else > > > > >>>>>> > > > > >>>>>>> we discuss. I think this whole discussion about backfill, > > > > >>>>>>> > > > > >>>>>> consumers, > > > > >>> > > > > >>>> streams, etc., is beyond the scope of KIP-253. But it would be > > > > >>>>>>> > > > > >>>>>> cumbersome > > > > >>>>> > > > > >>>>>> to start a new thread at this point. > > > > >>>>>>> > > > > >>>>>>> I had missed KIP-253's Proposed Change #9 among all the > > > details... > > > > >>>>>>> > > > > >>>>>> I > > > > >>> > > > > >>>> think > > > > >>>>>> > > > > >>>>>>> this is a nice addition to the proposal. One thought is that > > it's > > > > >>>>>>> > > > > >>>>>> actually > > > > >>>>>> > > > > >>>>>>> irrelevant whether the hash function is linear. This is > simply > > an > > > > >>>>>>> > > > > >>>>>> algorithm > > > > >>>>>> > > > > >>>>>>> for moving a key from one partition to another, so the type > of > > > hash > > > > >>>>>>> function need not be a precondition. In fact, it also doesn't > > > > >>>>>>> > > > > >>>>>> matter > > > > >>> > > > > >>>> whether the topic is compacted or not, the algorithm works > > > > >>>>>>> > > > > >>>>>> regardless. > > > > >>>> > > > > >>>>> I think this is a good algorithm to keep in mind, as it might > > > > >>>>>>> > > > > >>>>>> solve a > > > > >>> > > > > >>>> variety of problems, but it does have a downside: that the > > producer > > > > >>>>>>> > > > > >>>>>> won't > > > > >>>>> > > > > >>>>>> know whether or not K1 was actually in P1, it just knows that > K1 > > > > >>>>>>> > > > > >>>>>> was > > > > >>> > > > > >>>> in > > > > >>>> > > > > >>>>> P1's keyspace before the new epoch. Therefore, it will have to > > > > >>>>>>> pessimistically send (K1,null) to P1 just in case. But the > next > > > > >>>>>>> > > > > >>>>>> time > > > > >>> > > > > >>>> K1 > > > > >>>> > > > > >>>>> comes along, the producer *also* won't remember that it already > > > > >>>>>>> > > > > >>>>>> retracted > > > > >>>>> > > > > >>>>>> K1 from P1, so it will have to send (K1,null) *again*. By > > > > >>>>>>> > > > > >>>>>> extension, > > > > >>> > > > > >>>> every > > > > >>>>>> > > > > >>>>>>> time the producer sends to P2, it will also have to send a > > > > >>>>>>> > > > > >>>>>> tombstone > > > > >>> > > > > >>>> to > > > > >>>> > > > > >>>>> P1, > > > > >>>>>> > > > > >>>>>>> which is a pretty big burden. To make the situation worse, if > > > there > > > > >>>>>>> > > > > >>>>>> is > > > > >>>> > > > > >>>>> a > > > > >>>>> > > > > >>>>>> second split, say P2 becomes P2 and P3, then any key Kx > > belonging > > > > >>>>>>> > > > > >>>>>> to > > > > >>> > > > > >>>> P3 > > > > >>>> > > > > >>>>> will also have to be retracted from P2 *and* P1, since the > > producer > > > > >>>>>>> > > > > >>>>>> can't > > > > >>>>> > > > > >>>>>> know whether Kx had been last written to P2 or P1. Over a long > > > > >>>>>>> > > > > >>>>>> period > > > > >>> > > > > >>>> of > > > > >>>>> > > > > >>>>>> time, this clearly becomes a issue, as the producer must send > an > > > > >>>>>>> > > > > >>>>>> arbitrary > > > > >>>>>> > > > > >>>>>>> number of retractions along with every update. > > > > >>>>>>> > > > > >>>>>>> In contrast, the proposed backfill operation has an end, and > > > after > > > > >>>>>>> > > > > >>>>>> it > > > > >>> > > > > >>>> ends, > > > > >>>>>> > > > > >>>>>>> everyone can afford to forget that there ever was a different > > > > >>>>>>> > > > > >>>>>> partition > > > > >>>> > > > > >>>>> layout. > > > > >>>>>>> > > > > >>>>>>> Really, though, figuring out how to split compacted topics is > > > > >>>>>>> > > > > >>>>>> beyond > > > > >>> > > > > >>>> the > > > > >>>>> > > > > >>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be > in > > > > >>>>>>> > > > > >>>>>> this > > > > >>> > > > > >>>> KIP... > > > > >>>>>> > > > > >>>>>>> We do need in-order delivery during partition expansion. It > > would > > > > >>>>>>> > > > > >>>>>> be > > > > >>> > > > > >>>> fine > > > > >>>>> > > > > >>>>>> by me to say that you *cannot* expand partitions of a > > > log-compacted > > > > >>>>>>> > > > > >>>>>> topic > > > > >>>>> > > > > >>>>>> and call it a day. I think it would be better to tackle that > in > > > > >>>>>>> > > > > >>>>>> another > > > > >>>> > > > > >>>>> KIP. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Point 2: > > > > >>>>>>> Regarding whether the consumer re-shuffles its inputs, this > is > > > > >>>>>>> > > > > >>>>>> always > > > > >>> > > > > >>>> on > > > > >>>>> > > > > >>>>>> the table; any consumer who wants to re-shuffle its input is > > free > > > > >>>>>>> > > > > >>>>>> to > > > > >>> > > > > >>>> do > > > > >>>> > > > > >>>>> so. > > > > >>>>>> > > > > >>>>>>> But this is currently not required. It's just that the > current > > > > >>>>>>> > > > > >>>>>> high-level > > > > >>>>> > > > > >>>>>> story with Kafka encourages the use of partitions as a unit of > > > > >>>>>>> > > > > >>>>>> concurrency. > > > > >>>>>> > > > > >>>>>>> As long as consumers are single-threaded, they can happily > > > consume > > > > >>>>>>> > > > > >>>>>> a > > > > >>> > > > > >>>> single > > > > >>>>>> > > > > >>>>>>> partition without concurrency control of any kind. This is a > > key > > > > >>>>>>> > > > > >>>>>> aspect > > > > >>>> > > > > >>>>> to > > > > >>>>>> > > > > >>>>>>> this system that lets folks design high-throughput systems on > > top > > > > >>>>>>> > > > > >>>>>> of > > > > >>> > > > > >>>> it > > > > >>>> > > > > >>>>> surprisingly easily. If all consumers were instead > > > > >>>>>>> > > > > >>>>>> encouraged/required > > > > >>>> > > > > >>>>> to > > > > >>>>> > > > > >>>>>> implement a repartition of their own, then the consumer > becomes > > > > >>>>>>> significantly more complex, requiring either the consumer to > > > first > > > > >>>>>>> > > > > >>>>>> produce > > > > >>>>>> > > > > >>>>>>> to its own intermediate repartition topic or to ensure that > > > > >>>>>>> > > > > >>>>>> consumer > > > > >>> > > > > >>>> threads have a reliable, high-bandwith channel of communication > > > > >>>>>>> > > > > >>>>>> with > > > > >>> > > > > >>>> every > > > > >>>>>> > > > > >>>>>>> other consumer thread. > > > > >>>>>>> > > > > >>>>>>> Either of those tradeoffs may be reasonable for a particular > > user > > > > >>>>>>> > > > > >>>>>> of > > > > >>> > > > > >>>> Kafka, > > > > >>>>>> > > > > >>>>>>> but I don't know if we're in a position to say that they are > > > > >>>>>>> > > > > >>>>>> reasonable > > > > >>>> > > > > >>>>> for > > > > >>>>>> > > > > >>>>>>> *every* user of Kafka. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Point 3: > > > > >>>>>>> Regarding Jun's point about this use case, "(3) stateful and > > > > >>>>>>> > > > > >>>>>> maintaining > > > > >>>>> > > > > >>>>>> the > > > > >>>>>>> states in a local store", I agree that they may use a > framework > > > > >>>>>>> > > > > >>>>>> *like* > > > > >>>> > > > > >>>>> Kafka Streams, but that is not the same as using Kafka Streams. > > > > >>>>>>> > > > > >>>>>> This > > > > >>> > > > > >>>> is > > > > >>>> > > > > >>>>> why > > > > >>>>>> > > > > >>>>>>> I think it's better to solve it in Core: because it is then > > > solved > > > > >>>>>>> > > > > >>>>>> for > > > > >>>> > > > > >>>>> KStreams and also for everything else that facilitates local > > state > > > > >>>>>>> maintenance. To me, Streams is a member of the category of > > > "stream > > > > >>>>>>> processing frameworks", which is itself a subcategory of > > "things > > > > >>>>>>> > > > > >>>>>> requiring > > > > >>>>>> > > > > >>>>>>> local state maintenence". I'm not sure if it makes sense to > > > assert > > > > >>>>>>> > > > > >>>>>> that > > > > >>>> > > > > >>>>> Streams is a sufficient and practical replacement for > everything > > in > > > > >>>>>>> > > > > >>>>>> "things > > > > >>>>>> > > > > >>>>>>> requiring local state maintenence". > > > > >>>>>>> > > > > >>>>>>> But, yes, I do agree that per-key ordering is an absolute > > > > >>>>>>> > > > > >>>>>> requirement, > > > > >>>> > > > > >>>>> therefore I think that KIP-253 itself is a necessary step. > > > > >>>>>>> > > > > >>>>>> Regarding > > > > >>> > > > > >>>> the > > > > >>>>> > > > > >>>>>> coupling of the state store partitioning to the topic > > > partitioning, > > > > >>>>>>> > > > > >>>>>> yes, > > > > >>>>> > > > > >>>>>> this is an issue we are discussing solutions to right now. We > > may > > > > >>>>>>> > > > > >>>>>> go > > > > >>> > > > > >>>> ahead > > > > >>>>>> > > > > >>>>>>> and introduce an overpartition layer on our inputs to solve > it, > > > but > > > > >>>>>>> > > > > >>>>>> then > > > > >>>>> > > > > >>>>>> again, if we get the ability to split partitions with > backfill, > > we > > > > >>>>>>> > > > > >>>>>> may > > > > >>>> > > > > >>>>> not > > > > >>>>>> > > > > >>>>>>> need to! > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Point 4: > > > > >>>>>>> On this: > > > > >>>>>>> > > > > >>>>>>> Regarding thought 2: If we don't care about the stream > > use-case, > > > > >>>>>>>> > > > > >>>>>>> then > > > > >>>> > > > > >>>>> the > > > > >>>>>> > > > > >>>>>>> current KIP probably has already addressed problem without > > > > >>>>>>>> > > > > >>>>>>> requiring > > > > >>>> > > > > >>>>> consumer to know the partition function. If we care about the > > > > >>>>>>>> > > > > >>>>>>> stream > > > > >>>> > > > > >>>>> use-case, we already need coordination across producers of > > > > >>>>>>>> > > > > >>>>>>> different > > > > >>>> > > > > >>>>> topics, i.e. the same partition function needs to be used by > > > > >>>>>>>> > > > > >>>>>>> producers > > > > >>>>> > > > > >>>>>> of > > > > >>>>>> > > > > >>>>>>> topics A and B in order to join topics A and B. Thus, it > might > > be > > > > >>>>>>>> reasonable to extend coordination a bit and say we need > > > > >>>>>>>> > > > > >>>>>>> coordination > > > > >>>> > > > > >>>>> across > > > > >>>>>>> > > > > >>>>>>>> clients (i.e. producer and consumer), such that consumer > knows > > > > >>>>>>>> > > > > >>>>>>> the > > > > >>> > > > > >>>> partition function used by producer. If we do so, then we can > let > > > > >>>>>>>> > > > > >>>>>>> consumer > > > > >>>>>>> > > > > >>>>>>>> re-copy data for the change log topic using the same > partition > > > > >>>>>>>> > > > > >>>>>>> function > > > > >>>>> > > > > >>>>>> as > > > > >>>>>>> > > > > >>>>>>>> producer. This approach has lower overhead as compared to > > having > > > > >>>>>>>> > > > > >>>>>>> producer > > > > >>>>>> > > > > >>>>>>> re-copy data of the input topic. > > > > >>>>>>>> Also, producer currently does not need to know the data > > already > > > > >>>>>>>> > > > > >>>>>>> produced > > > > >>>>>> > > > > >>>>>>> to > > > > >>>>>>> > > > > >>>>>>>> the topic. If we let producer split/merge partition, it > would > > > > >>>>>>>> > > > > >>>>>>> require > > > > >>>> > > > > >>>>> producer to consume the existing data, which intuitively is the > > > > >>>>>>>> > > > > >>>>>>> task > > > > >>>> > > > > >>>>> of > > > > >>>>> > > > > >>>>>> consumer. > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>>> I think we do care about use cases *like* Streams, I just > don't > > > > >>>>>>> > > > > >>>>>> think > > > > >>> > > > > >>>> we > > > > >>>>> > > > > >>>>>> should rely on Streams to implement a feature of Core like > > > > >>>>>>> > > > > >>>>>> partition > > > > >>> > > > > >>>> expansion. > > > > >>>>>>> > > > > >>>>>>> Note, though, that we (Streams) do not require coordination > > > across > > > > >>>>>>> producers. If two topics are certified to be co-partitioned, > > then > > > > >>>>>>> > > > > >>>>>> Streams > > > > >>>>> > > > > >>>>>> apps can make use of that knowledge to optimize their topology > > > > >>>>>>> > > > > >>>>>> (skipping > > > > >>>>> > > > > >>>>>> a > > > > >>>>>> > > > > >>>>>>> repartition). But if they don't know whether they are > > > > >>>>>>> > > > > >>>>>> co-partitioned, > > > > >>> > > > > >>>> then > > > > >>>>>> > > > > >>>>>>> they'd better go ahead and repartition within the topology. > > This > > > is > > > > >>>>>>> > > > > >>>>>> the > > > > >>>> > > > > >>>>> current state. > > > > >>>>>>> > > > > >>>>>>> A huge selling point of Kafka is enabling different parts of > > > > >>>>>>> > > > > >>>>>> loosely > > > > >>> > > > > >>>> coupled organizations to produce and consume data independently. > > > > >>>>>>> > > > > >>>>>> Some > > > > >>> > > > > >>>> coordination between producers and consumers is necessary, like > > > > >>>>>>> coordinating on the names of topics and their schemas. But > > > Kafka's > > > > >>>>>>> > > > > >>>>>> value > > > > >>>>> > > > > >>>>>> proposition w.r.t. ESBs, etc. is inversely proportional to the > > > > >>>>>>> > > > > >>>>>> amount > > > > >>> > > > > >>>> of > > > > >>>>> > > > > >>>>>> coordination required. I think it behooves us to be extremely > > > > >>>>>>> > > > > >>>>>> skeptical > > > > >>>> > > > > >>>>> about introducing any coordination beyond correctness > protocols. > > > > >>>>>>> > > > > >>>>>>> Asking producers and consumers, or even two different > > producers, > > > to > > > > >>>>>>> > > > > >>>>>> share > > > > >>>>> > > > > >>>>>> code like the partition function is a pretty huge ask. What if > > > they > > > > >>>>>>> > > > > >>>>>> are > > > > >>>> > > > > >>>>> using different languages? > > > > >>>>>>> > > > > >>>>>>> Comparing organizational overhead vs computational overhead, > > > there > > > > >>>>>>> > > > > >>>>>> are > > > > >>>> > > > > >>>>> maybe two orders of magnitude difference between them. In other > > > > >>>>>>> > > > > >>>>>> words, > > > > >>>> > > > > >>>>> I > > > > >>>>> > > > > >>>>>> would happily take on the (linear) overhead of having the > > producer > > > > >>>>>>> > > > > >>>>>> re-copy > > > > >>>>>> > > > > >>>>>>> the data once during a re-partition in order to save the > > > > >>>>>>> > > > > >>>>>> organizational > > > > >>>> > > > > >>>>> overhead of tying all the producers and consumers together > across > > > > >>>>>>> > > > > >>>>>> multiple > > > > >>>>>> > > > > >>>>>>> boundaries. > > > > >>>>>>> > > > > >>>>>>> On that last paragraph: note that the producer *did* know the > > > data > > > > >>>>>>> > > > > >>>>>> it > > > > >>> > > > > >>>> already produced. It handled it the first time around. Asking it > > to > > > > >>>>>>> re-produce it into a new partition layout is squarely within > > its > > > > >>>>>>> > > > > >>>>>> scope > > > > >>>> > > > > >>>>> of > > > > >>>>> > > > > >>>>>> capabilities. Contrast this with the alternative, asking the > > > > >>>>>>> > > > > >>>>>> consumer > > > > >>> > > > > >>>> to > > > > >>>>> > > > > >>>>>> re-partition the data. I think this is even less intuitive, > when > > > > >>>>>>> > > > > >>>>>> the > > > > >>> > > > > >>>> partition function belongs to the producer. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Point 5: > > > > >>>>>>> Dong asked this: > > > > >>>>>>> > > > > >>>>>>> For stream use-case that needs to increase consumer number, > the > > > > >>>>>>>> existing consumer can backfill the existing data in the > change > > > > >>>>>>>> > > > > >>>>>>> log > > > > >>> > > > > >>>> topic > > > > >>>>>> > > > > >>>>>>> to > > > > >>>>>>> > > > > >>>>>>>> the same change log topic with the new partition number, > > before > > > > >>>>>>>> > > > > >>>>>>> the > > > > >>> > > > > >>>> new > > > > >>>>> > > > > >>>>>> set > > > > >>>>>>> > > > > >>>>>>>> of consumers bootstrap state from the new partitions of the > > > > >>>>>>>> > > > > >>>>>>> change > > > > >>> > > > > >>>> log > > > > >>>>> > > > > >>>>>> topic, right? > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>>> In this sense, the "consumer" is actually the producer of the > > > > >>>>>>> > > > > >>>>>> changelog > > > > >>>> > > > > >>>>> topic, so if we support partition expansion + backfill as a > > > > >>>>>>> > > > > >>>>>> producer/broker > > > > >>>>>> > > > > >>>>>>> operation, then it would be very straightforward for Streams > to > > > > >>>>>>> > > > > >>>>>> split a > > > > >>>> > > > > >>>>> state store. As you say, they would simply instruct the broker > to > > > > >>>>>>> > > > > >>>>>> split > > > > >>>> > > > > >>>>> the > > > > >>>>>> > > > > >>>>>>> changelog topic's partitions, then backfill. Once the > backfill > > is > > > > >>>>>>> > > > > >>>>>> ready, > > > > >>>>> > > > > >>>>>> they can create a new crop of StandbyTasks to bootstrap the > more > > > > >>>>>>> > > > > >>>>>> granular > > > > >>>>> > > > > >>>>>> state stores and finally switch over to them when they are > > ready. > > > > >>>>>>> > > > > >>>>>>> But this actually seems to be an argument in favor of > > > > >>>>>>> > > > > >>>>>> split+backfill, > > > > >>> > > > > >>>> so > > > > >>>>> > > > > >>>>>> maybe I missed the point. > > > > >>>>>>> > > > > >>>>>>> You also asked me to explain why copying the "input" topic is > > > > >>>>>>> > > > > >>>>>> better > > > > >>> > > > > >>>> than > > > > >>>>> > > > > >>>>>> copying the "changelog" topic. I think they are totally > > > > >>>>>>> > > > > >>>>>> independent, > > > > >>> > > > > >>>> actually. For one thing, you can't depend on the existence of a > > > > >>>>>>> > > > > >>>>>> "changelog" > > > > >>>>>> > > > > >>>>>>> topic in general, only within Streams, but Kafka's user base > > > > >>>>>>> > > > > >>>>>> clearly > > > > >>> > > > > >>>> exceeds Streams's user base. Plus, you actually also can't > depend > > > > >>>>>>> > > > > >>>>>> on > > > > >>> > > > > >>>> the > > > > >>>>> > > > > >>>>>> existence of a changelog topic within Streams, since that is > an > > > > >>>>>>> > > > > >>>>>> optional > > > > >>>>> > > > > >>>>>> feature of *some* state store implementations. Even in the > > > > >>>>>>> > > > > >>>>>> situation > > > > >>> > > > > >>>> where > > > > >>>>>> > > > > >>>>>>> you do have a changelog topic in Streams, there may be use > > cases > > > > >>>>>>> > > > > >>>>>> where > > > > >>>> > > > > >>>>> it > > > > >>>>> > > > > >>>>>> makes sense to expand the partitions of just the input, or > just > > > the > > > > >>>>>>> changelog. > > > > >>>>>>> > > > > >>>>>>> The ask for a Core feature of split+backfill is really about > > > > >>>>>>> > > > > >>>>>> supporting > > > > >>>> > > > > >>>>> the > > > > >>>>>> > > > > >>>>>>> use case of splitting partitions in log-compacted topics, > > > > >>>>>>> > > > > >>>>>> regardless > > > > >>> > > > > >>>> of > > > > >>>> > > > > >>>>> whether that topic is an "input" or a "changelog" or anything > > else > > > > >>>>>>> > > > > >>>>>> for > > > > >>>> > > > > >>>>> that > > > > >>>>>> > > > > >>>>>>> matter. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Point 6: > > > > >>>>>>> On the concern about the performance overhead of copying data > > > > >>>>>>> > > > > >>>>>> between > > > > >>> > > > > >>>> the > > > > >>>>> > > > > >>>>>> brokers, I think it's actually a bit overestimated. Splitting > a > > > > >>>>>>> > > > > >>>>>> topic's > > > > >>>> > > > > >>>>> partition is probably rare, certainly rarer in general than > > > > >>>>>>> > > > > >>>>>> bootstrapping > > > > >>>>> > > > > >>>>>> new consumers on that topic. If "bootstrapping new consumers" > > > means > > > > >>>>>>> > > > > >>>>>> that > > > > >>>>> > > > > >>>>>> they have to re-shuffle the data before they consume it, then > > you > > > > >>>>>>> > > > > >>>>>> wind > > > > >>>> > > > > >>>>> up > > > > >>>>> > > > > >>>>>> copying the same record multiple times: > > > > >>>>>>> > > > > >>>>>>> (broker: input topic) -> (initial consumer) -> (broker: > > > repartition > > > > >>>>>>> > > > > >>>>>> topic) > > > > >>>>>> > > > > >>>>>>> -> (real consumer) > > > > >>>>>>> > > > > >>>>>>> That's 3x, and it's also 3x for every new record after the > > split > > > as > > > > >>>>>>> > > > > >>>>>> well, > > > > >>>>> > > > > >>>>>> since you don't get to stop repartitioning/reshuffling once > you > > > > >>>>>>> > > > > >>>>>> start. > > > > >>>> > > > > >>>>> Whereas if you do a backfill in something like the procedure I > > > > >>>>>>> > > > > >>>>>> outlined, > > > > >>>>> > > > > >>>>>> you only copy the prefix of the partition before the split, > and > > > you > > > > >>>>>>> > > > > >>>>>> send > > > > >>>>> > > > > >>>>>> it > > > > >>>>>> > > > > >>>>>>> once to the producer and then once to the new generation > > > partition. > > > > >>>>>>> > > > > >>>>>> Plus, > > > > >>>>> > > > > >>>>>> assuming we're splitting the partition for the benefit of > > > > >>>>>>> > > > > >>>>>> consumers, > > > > >>> > > > > >>>> there's no reason we can't co-locate the post-split partitions > on > > > > >>>>>>> > > > > >>>>>> the > > > > >>> > > > > >>>> same > > > > >>>>>> > > > > >>>>>>> host as the pre-split partition, making the second copy a > local > > > > >>>>>>> > > > > >>>>>> filesystem > > > > >>>>>> > > > > >>>>>>> operation. > > > > >>>>>>> > > > > >>>>>>> Even if you follow these two copies up with bootstrapping a > new > > > > >>>>>>> > > > > >>>>>> consumer, > > > > >>>>> > > > > >>>>>> it's still rare for this to occur, so you get to amortize > these > > > > >>>>>>> > > > > >>>>>> copies > > > > >>>> > > > > >>>>> over > > > > >>>>>> > > > > >>>>>>> the lifetime of the topic, whereas a reshuffle just keeps > > making > > > > >>>>>>> > > > > >>>>>> copies > > > > >>>> > > > > >>>>> for > > > > >>>>>> > > > > >>>>>>> every new event. > > > > >>>>>>> > > > > >>>>>>> And finally, I really do think that regardless of any > > performance > > > > >>>>>>> > > > > >>>>>> concerns > > > > >>>>>> > > > > >>>>>>> about this operation, if it preserves loose organizational > > > > >>>>>>> > > > > >>>>>> coupling, > > > > >>> > > > > >>>> it > > > > >>>> > > > > >>>>> is > > > > >>>>>> > > > > >>>>>>> certainly worth it. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> In conclusion: > > > > >>>>>>> It might actually be a good idea for us to clarify the scope > of > > > > >>>>>>> > > > > >>>>>> KIP-253. > > > > >>>>> > > > > >>>>>> If > > > > >>>>>> > > > > >>>>>>> we're all agreed that it's a good algorithm for allowing > > in-order > > > > >>>>>>> > > > > >>>>>> message > > > > >>>>> > > > > >>>>>> delivery during partition expansion, then we can continue this > > > > >>>>>>> > > > > >>>>>> discussion > > > > >>>>> > > > > >>>>>> as a new KIP, something like "backfill with partition > > expansion". > > > > >>>>>>> > > > > >>>>>> This > > > > >>>> > > > > >>>>> would let Dong proceed with KIP-253. On the other hand, if it > > seems > > > > >>>>>>> > > > > >>>>>> like > > > > >>>>> > > > > >>>>>> this conversation may alter the design of KIP-253, then maybe > we > > > > >>>>>>> > > > > >>>>>> *should* > > > > >>>>> > > > > >>>>>> just finish working it out. > > > > >>>>>>> > > > > >>>>>>> For my part, my only concern about KIP-253 is the one I > raised > > > > >>>>>>> > > > > >>>>>> earlier. > > > > >>>> > > > > >>>>> Thanks again, all, for considering these points, > > > > >>>>>>> -John > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin < > lindong28@gmail.com > > > > > > > >>>>>>> > > > > >>>>>> wrote: > > > > >>>> > > > > >>>>> On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin < > lindong28@gmail.com> > > > > >>>>>>>> > > > > >>>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> Hey Jan, > > > > >>>>>>>>> > > > > >>>>>>>>> Thanks for the enthusiasm in improving Kafka's design. Now > > > > >>>>>>>>> > > > > >>>>>>>> that I > > > > >>> > > > > >>>> have > > > > >>>>>> > > > > >>>>>>> read through your discussion with Jun, here are my thoughts: > > > > >>>>>>>>> > > > > >>>>>>>>> - The latest proposal should with log compacted topics by > > > > >>>>>>>>> > > > > >>>>>>>> properly > > > > >>>> > > > > >>>>> deleting old messages after a new message with the same key is > > > > >>>>>>>>> > > > > >>>>>>>> produced. > > > > >>>>>>> > > > > >>>>>>>> So > > > > >>>>>>>> > > > > >>>>>>>>> it is probably not a concern anymore. Could you comment if > > > > >>>>>>>>> > > > > >>>>>>>> there > > > > >>> > > > > >>>> is > > > > >>>> > > > > >>>>> still > > > > >>>>>>> > > > > >>>>>>>> issue? > > > > >>>>>>>>> > > > > >>>>>>>>> - I wrote the SEP-5 and I am pretty familiar with the > > > > >>>>>>>>> > > > > >>>>>>>> motivation > > > > >> > > > > >> > > > > > > > > > > --000000000000c90557056ddb6785--