Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 679D1200D53 for ; Tue, 5 Dec 2017 20:24:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6638C160C1B; Tue, 5 Dec 2017 19:24:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DB3ED160BF1 for ; Tue, 5 Dec 2017 20:24:55 +0100 (CET) Received: (qmail 37671 invoked by uid 500); 5 Dec 2017 19:24:54 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 37659 invoked by uid 99); 5 Dec 2017 19:24:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Dec 2017 19:24:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 65A4DC4D4E for ; Tue, 5 Dec 2017 19:24:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 5TXelcuwlbV0 for ; Tue, 5 Dec 2017 19:24:48 +0000 (UTC) Received: from mail-yb0-f177.google.com (mail-yb0-f177.google.com [209.85.213.177]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 427395F256 for ; Tue, 5 Dec 2017 19:24:47 +0000 (UTC) Received: by mail-yb0-f177.google.com with SMTP id i15so608769ybk.0 for ; Tue, 05 Dec 2017 11:24:47 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=uBVBve20NWDIziuYgSql3160JIt9GT6wIChHIEXj/Lg=; b=KtIbm9gZ7278b07Y2aZc0koK8NxN144F13t9XPqh+EsSHhwHVu4NSDinDACXbZz6OJ ktXrBaDWaiPMQkG8xFNLyqHYM27qhLV01soON8DmIsf0SpFZ6ZXYiEatjgtDJwrTShe4 0u3UzAWgOymqgdn0GmX82YkWOLZQf/3AgyO6I+2jZBNGcKDScUb+J+70nSQldl7lsaqB u94yjJB6/Qkghfh8UhKNEp5KfJSsoGTMCPy0UbbQFJHeu/MqWzjAjSoSq7haNC8tbCcV x7aQ67fgc5Dg9pFK5h4aoOTezIoEY/nrnJ8X4H3MOStFRnqbtUR8qP9LhyU9eNmQ2aws DlYw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=uBVBve20NWDIziuYgSql3160JIt9GT6wIChHIEXj/Lg=; b=Osl1hPjzrGjtyAMOf8V220p5/J13xmNUlAXsWLvrLBkhgVtwDZkDzm/emGZkwVx2hh umX7/3bkIIZ9SZYjfMrMhZsIeQmJJA2Gdu03Crb+GmuzQ8fvpULUYWHGpMNPcDSBWUkq MH1DK1br9EQ8fR1re+U5jBf8EAhq6z2uxFynUl9VZ6xczeNhzvkzx2otvAlSvlWrVCiF /XVPt9uVw35B5Tna/ATNOH+NqlG3NUe/iGzZUvtRVwbNDJ0iq5dZu29AJCd6C5g0BRRV ogfC7QgmZPUQfCXJn/PDsUZUJ9oR5EUUJk74wkw5X1Z+cjGK7ItPjZNi/x/kt9bggPkF V6Ug== X-Gm-Message-State: AJaThX5dGH+IiSXgIli9YELx0T+CZx82XyvFMBlG12a2j6JGAmaGp+Xo rn86hw8lMakieesQhOa+lwt9hKIxiGUFXTtVqJj9JA== X-Google-Smtp-Source: AGs4zMYMXdQctFl37UeSNLEOAikGIYbLqH8SqhUWzCa3PTDieKcVHj0vcq7Zcq2WmsOiilx4UYl8nKQG7ZYoN/eF09U= X-Received: by 10.37.46.67 with SMTP id b3mr13193769ybn.311.1512501880678; Tue, 05 Dec 2017 11:24:40 -0800 (PST) MIME-Version: 1.0 Received: by 10.37.87.66 with HTTP; Tue, 5 Dec 2017 11:24:39 -0800 (PST) In-Reply-To: <1512500569.3800193.1195001104.310276E7@webmail.messagingengine.com> References: <1511298156.455181.1180266296.193D3A00@webmail.messagingengine.com> <5A1D027C.7020206@trivago.com> <1511887086.3498918.1186985376.7F1DEDD6@webmail.messagingengine.com> <1511986396.621620.1188577080.5C0A7DEC@webmail.messagingengine.com> <1512063435.1832330.1189640136.2D1EEEC1@webmail.messagingengine.com> <1512254273.250506.1191933912.13817ECC@webmail.messagingengine.com> <1512334541.920899.1192564144.1A4C5D62@webmail.messagingengine.com> <5A25231B.8050703@trivago.com> <1512457563.3859529.1194299248.56AB43CE@webmail.messagingengine.com> <1512500569.3800193.1195001104.310276E7@webmail.messagingengine.com> From: Ted Yu Date: Tue, 5 Dec 2017 11:24:39 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="089e0828bdf051ee94055f9cc9e4" archived-at: Tue, 05 Dec 2017 19:24:57 -0000 --089e0828bdf051ee94055f9cc9e4 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable bq. We also have a tunable for total number of cache slots. We never cache more than this number of incremental fetch sessions. Is it possible to manage the cache based on heap consumption instead of number of slots ? It seems heap estimation can be done by counting PartitionData (along with overhead for related Map structure). Cheers On Tue, Dec 5, 2017 at 11:02 AM, Colin McCabe wrote: > On Tue, Dec 5, 2017, at 08:51, Jason Gustafson wrote: > > Hi Colin, > > > > Thanks for the response. A couple replies: > > > > > > > I=E2=80=99m a bit ambivalent about letting the client choose the sess= ion > > > timeout. What if clients choose timeouts that are too long? Hmm.... > > > I do agree the timeout should be sized proportional to > > > max.poll.interval.ms. > > > > > > We have solved this in other cases by letting the broker enforce a > > maximum timeout. After thinking about it a bit, it's probably overkill > in this > > case since the caching is just an optimization. Instead of stressing ov= er > > timeouts and such, I am actually wondering if we just need a reasonable > > session cache eviction policy. For example, when the number of slots is > > exceeded, perhaps you evict the session with the fewest partitions or t= he > > one with the largest interval between fetches. We could give priority t= o > > the replicas. Perhaps this might let us get rid of a few of the configs= . > > I agree that it would be nice to get rid of the tunable for eviction > time. However, I'm concerned that if we do, we might run into cache > thrashing. For example, if we have N cache slots and N+1 clients that > are all fetching continuously, we might have to evict a client on every > single fetch. It would be much better to give a cache slot to N clients > and let the last client do full fetch requests. > > Perhaps we could mitigate this problem by evicting the smallest fetch > session-- the one that is for the smallest number of partitions. This > would allow "big" clients that fetch many partitions (e.g. MirrorMaker) > to get priority. But then you run into the problem where someone > fetches a huge number of partitions, and then goes away for a long time, > and you never reuse that cache memory. > > How about this approach? We have a tunable for minimum eviction time > (default 2 minutes). We cannot evict a client before this timeout has > expired. We also have a tunable for total number of cache slots. We > never cache more than this number of incremental fetch sessions. > > Sessions become eligible for eviction after 2 minutes, whether or not > the session is active. > Fetch Request A will evict Fetch Request B if and only if: > 1. A has been active in the last 2 minutes and B has not, OR > 2. A was made by a follower and B was made by a consumer, OR > 3. A has more partitions than B, OR > 4. A is newer than B > > Then, in a setup where consumers are fetching different numbers of > partitions, we will eventually converge on giving incremental fetch > sessions to the big consumers, and not to the small consumers. In a > setup where consumers are all of equal size but the cache is too small > for all of them, we still thrash, but slowly. Nobody can be evicted > before their 2 minutes are up. So in general, the overhead of the extra > full requests is still low. If someone makes a big request and then > shuts down, they get cleaned up after 2 minutes, because of condition > #1. And there are only two tunables needed: cache size and eviction > time. > > > > > The main reason is if there is a bug in the incremental fetch feature. > > > > > > > This was in response to my question about removing the consumer config. > > And sure, any new feature may have bugs, but that's what we have testin= g > for > > ;). Users can always fall back to a previous version if there are any > > major problems. As you know, it's tough removing configs once they are > there, > > so I think we should try to add them only if they make sense in the lon= g > > term. > > That's a fair point. I guess if we do need to disable incremental > fetches in production because of a bug, we can modify the broker > configuration to do so (by setting 0 cache slots). > > best, > Colin > > > > > Thanks, > > Jason > > > > On Mon, Dec 4, 2017 at 11:06 PM, Colin McCabe > wrote: > > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote: > > > > > > > > > > > > On 03.12.2017 21:55, Colin McCabe wrote: > > > > > On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote: > > > > >> Thanks for the explanation, Colin. A few more questions. > > > > >> > > > > >>> The session epoch is not complex. It's just a number which > > > increments > > > > >>> on each incremental fetch. The session epoch is also useful fo= r > > > > >>> debugging-- it allows you to match up requests and responses wh= en > > > > >>> looking at log files. > > > > >> Currently each request in Kafka has a correlation id to help > match the > > > > >> requests and responses. Is epoch doing something differently? > > > > > Hi Becket, > > > > > > > > > > The correlation ID is used within a single TCP session, to unique= ly > > > > > associate a request with a response. The correlation ID is not > unique > > > > > (and has no meaning) outside the context of that single TCP > session. > > > > > > > > > > Keep in mind, NetworkClient is in charge of TCP sessions, and > generally > > > > > tries to hide that information from the upper layers of the code. > So > > > > > when you submit a request to NetworkClient, you don't know if tha= t > > > > > request creates a TCP session, or reuses an existing one. > > > > >>> Unfortunately, this doesn't work. Imagine the client misses an > > > > >>> increment fetch response about a partition. And then the > partition > > > is > > > > >>> never updated after that. The client has no way to know about > the > > > > >>> partition, since it won't be included in any future incremental > fetch > > > > >>> responses. And there are no offsets to compare, since the > partition > > > is > > > > >>> simply omitted from the response. > > > > >> I am curious about in which situation would the follower miss a > > > response > > > > >> of a partition. If the entire FetchResponse is lost (e.g. > timeout), > > > the > > > > >> follower would disconnect and retry. That will result in sending= a > > > full > > > > >> FetchRequest. > > > > > Basically, you are proposing that we rely on TCP for reliable > delivery > > > > > in a distributed system. That isn't a good idea for a bunch of > > > > > different reasons. First of all, TCP timeouts tend to be very > long. > > > So > > > > > if the TCP session timing out is your error detection mechanism, > you > > > > > have to wait minutes for messages to timeout. Of course, we add = a > > > > > timeout on top of that after which we declare the connection bad > and > > > > > manually close it. But just because the session is closed on one > end > > > > > doesn't mean that the other end knows that it is closed. So the > leader > > > > > may have to wait quite a long time before TCP decides that yes, > > > > > connection X from the follower is dead and not coming back, even > though > > > > > gremlins ate the FIN packet which the follower attempted to > translate. > > > > > If the cache state is tied to that TCP session, we have to keep > that > > > > > cache around for a much longer time than we should. > > > > Hi, > > > > > > > > I see this from a different perspective. The cache expiry time > > > > has the same semantic as idle connection time in this scenario. > > > > It is the time range we expect the client to come back an reuse > > > > its broker side state. I would argue that on close we would get an > > > > extra shot at cleaning up the session state early. As opposed to > > > > always wait for that duration for expiry to happen. > > > > > > Hi Jan, > > > > > > The idea here is that the incremental fetch cache expiry time can be > > > much shorter than the TCP session timeout. In general the TCP sessio= n > > > timeout is common to all TCP connections, and very long. To make the= se > > > numbers a little more concrete, the TCP session timeout is often > > > configured to be 2 hours on Linux. (See > > > https://www.cyberciti.biz/tips/linux-increasing-or- > decreasing-tcp-sockets- > > > timeouts.html > > > ) The timeout I was proposing for incremental fetch sessions was one > or > > > two minutes at most. > > > > > > > > > > > > Secondly, from a software engineering perspective, it's not a goo= d > idea > > > > > to try to tightly tie together TCP and our code. We would have t= o > > > > > rework how we interact with NetworkClient so that we are aware of > > > things > > > > > like TCP sessions closing or opening. We would have to be carefu= l > > > > > preserve the ordering of incoming messages when doing things like > > > > > putting incoming requests on to a queue to be processed by multip= le > > > > > threads. It's just a lot of complexity to add, and there's no > upside. > > > > I see the point here. And I had a small chat with Dong Lin already > > > > making me aware of this. I tried out the approaches and propose the > > > > following: > > > > > > > > The client start and does a full fetch. It then does incremental > fetches. > > > > The connection to the broker dies and is re-established by > NetworkClient > > > > under the hood. > > > > The broker sees an incremental fetch without having state =3D> retu= rns > > > > error: > > > > Client sees the error, does a full fetch and goes back to > incrementally > > > > fetching. > > > > > > > > having this 1 additional error round trip is essentially the same a= s > > > > when something > > > > with the sessions or epoch changed unexpectedly to the client (say > > > > expiry). > > > > > > > > So its nothing extra added but the conditions are easier to evaluat= e. > > > > Especially since we do everything with NetworkClient. Other > implementers > > > > on the > > > > protocol are free to optimizes this and do not do the errornours > > > > roundtrip on the > > > > new connection. > > > > Its a great plus that the client can know when the error is gonna > > > > happen. instead of > > > > the server to always have to report back if something changes > > > > unexpectedly for the client > > > > > > You are assuming that the leader and the follower agree that the TCP > > > session drops at the same time. When there are network problems, thi= s > > > may not be true. The leader may still think the previous TCP session > is > > > active. In that case, we have to keep the incremental fetch session > > > state around until we learn otherwise (which could be up to that 2 ho= ur > > > timeout I mentioned). And if we get a new incoming incremental fetch > > > request, we can't assume that it replaces the previous one, because t= he > > > IDs will be different (the new one starts a new session). > > > > > > > > > > > > Imagine that I made an argument that client IDs are "complex" and > > > should > > > > > be removed from our APIs. After all, we can just look at the > remote IP > > > > > address and TCP port of each connection. Would you think that wa= s > a > > > > > good idea? The client ID is useful when looking at logs. For > example, > > > > > if a rebalance is having problems, you want to know what clients > were > > > > > having a problem. So having the client ID field to guide you is > > > > > actually much less "complex" in practice than not having an ID. > > > > I still cant follow why the correlation idea will not help here. > > > > Correlating logs with it usually works great. Even with primitive > tools > > > > like grep > > > > > > The correlation ID does help somewhat, but certainly not as much as a > > > unique 64-bit ID. The correlation ID is not unique in the broker, ju= st > > > unique to a single NetworkClient. Simiarly, the correlation ID is no= t > > > unique on the client side, if there are multiple Consumers, etc. > > > > > > > > > > > > Similarly, if metadata responses had epoch numbers (simple > incrementing > > > > > numbers), we would not have to debug problems like clients > accidentally > > > > > getting old metadata from servers that had been partitioned off > from > > > the > > > > > network for a while. Clients would know the difference between > old and > > > > > new metadata. So putting epochs in to the metadata request is mu= ch > > > less > > > > > "complex" operationally, even though it's an extra field in the > > > request. > > > > > This has been discussed before on the mailing list. > > > > > > > > > > So I think the bottom line for me is that having the session ID a= nd > > > > > session epoch, while it adds two extra fields, reduces operationa= l > > > > > complexity and increases debuggability. It avoids tightly > coupling us > > > > > to assumptions about reliable ordered delivery which tend to be > > > violated > > > > > in practice in multiple layers of the stack. Finally, it avoids > the > > > > > necessity of refactoring NetworkClient. > > > > So there is stacks out there that violate TCP guarantees? And > software > > > > still works? How can this be? Can you elaborate a little where this > > > > can be violated? I am not very familiar with virtualized environmen= ts > > > > but they can't really violate TCP contracts. > > > > > > TCP's guarantees of reliable, in-order transmission certainly can be > > > violated. For example, I once had to debug a cluster where a certain > > > node had a network card which corrupted its transmissions occasionall= y. > > > With all the layers of checksums, you would think that this was not > > > possible, but it happened. We occasionally got corrupted data writte= n > > > to disk on the other end because of it. Even more frustrating, the > data > > > was not corrupted on disk on the sending node-- it was a bug in the > > > network card driver that was injecting the errors. > > > > > > However, my point was not about TCP's guarantees being violated. My > > > point is that TCP's guarantees are only one small building block to > > > build a robust distributed system. TCP basically just says that if y= ou > > > get any bytes from the stream, you will get the ones that were sent b= y > > > the sender, in the order they were sent. TCP does not guarantee that > > > the bytes you send will get there. It does not guarantee that if you > > > close the connection, the other end will know about it in a timely > > > fashion. It does not guarantee that the bytes will be received in a > > > certain timeframe, and certainly doesn't guarantee that if you send a > > > byte on connection X and then on connection Y, that the remote end wi= ll > > > read a byte on X before reading a byte on Y. > > > > > > best, > > > Colin > > > > > > > > > > > Hope this made my view clearer, especially the first part. > > > > > > > > Best Jan > > > > > > > > > > > > > best, > > > > > Colin > > > > > > > > > > > > > > >> If there is an error such as NotLeaderForPartition is > > > > >> returned for some partitions, the follower can always send a ful= l > > > > >> FetchRequest. Is there a scenario that only some of the > partitions in > > > a > > > > >> FetchResponse is lost? > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Jiangjie (Becket) Qin > > > > >> > > > > >> > > > > >> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe > > > wrote: > > > > >> > > > > >>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > > > > >>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe< > cmccabe@apache.org> > > > > >>> wrote: > > > > >>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > > > >>>>>> Hey Colin, > > > > >>>>>> > > > > >>>>>> Thanks much for the update. I have a few questions below: > > > > >>>>>> > > > > >>>>>> 1. I am not very sure that we need Fetch Session Epoch. It > seems > > > that > > > > >>>>>> Fetch > > > > >>>>>> Session Epoch is only needed to help leader distinguish > between "a > > > > >>> full > > > > >>>>>> fetch request" and "a full fetch request and request a new > > > > >>> incremental > > > > >>>>>> fetch session". Alternatively, follower can also indicate "a > full > > > > >>> fetch > > > > >>>>>> request and request a new incremental fetch session" by > setting > > > Fetch > > > > >>>>>> Session ID to -1 without using Fetch Session Epoch. Does thi= s > make > > > > >>> sense? > > > > >>>>> Hi Dong, > > > > >>>>> > > > > >>>>> The fetch session epoch is very important for ensuring > > > correctness. It > > > > >>>>> prevents corrupted or incomplete fetch data due to network > > > reordering > > > > >>> or > > > > >>>>> loss. > > > > >>>>> > > > > >>>>> For example, consider a scenario where the follower sends a > fetch > > > > >>>>> request to the leader. The leader responds, but the response > is > > > lost > > > > >>>>> because of network problems which affected the TCP session. = In > > > that > > > > >>>>> case, the follower must establish a new TCP session and > re-send the > > > > >>>>> incremental fetch request. But the leader does not know that > the > > > > >>>>> follower didn't receive the previous incremental fetch > response. > > > It is > > > > >>>>> only the incremental fetch epoch which lets the leader know > that it > > > > >>>>> needs to resend that data, and not data which comes afterward= s. > > > > >>>>> > > > > >>>>> You could construct similar scenarios with message reordering= , > > > > >>>>> duplication, etc. Basically, this is a stateful protocol on = an > > > > >>>>> unreliable network, and you need to know whether the follower > got > > > the > > > > >>>>> previous data you sent before you move on. And you need to > handle > > > > >>>>> issues like duplicated or delayed requests. These issues do > not > > > affect > > > > >>>>> the full fetch request, because it is not stateful-- any full > fetch > > > > >>>>> request can be understood and properly responded to in > isolation. > > > > >>>>> > > > > >>>> Thanks for the explanation. This makes sense. On the other han= d > I > > > would > > > > >>>> be interested in learning more about whether Becket's solution > can > > > help > > > > >>>> simplify the protocol by not having the echo field and whether > that > > > is > > > > >>>> worth doing. > > > > >>> Hi Dong, > > > > >>> > > > > >>> I commented about this in the other thread. A solution which > doesn't > > > > >>> maintain session information doesn't work here. > > > > >>> > > > > >>>> > > > > >>>>>> 2. It is said that Incremental FetchRequest will include > > > partitions > > > > >>> whose > > > > >>>>>> fetch offset or maximum number of fetch bytes has been > changed. If > > > > >>>>>> follower's logStartOffet of a partition has changed, should > this > > > > >>>>>> partition also be included in the next FetchRequest to the > leader? > > > > >>>>> Otherwise, it > > > > >>>>>> may affect the handling of DeleteRecordsRequest because > leader may > > > > >>> not > > > > >>>>> know > > > > >>>>>> the corresponding data has been deleted on the follower. > > > > >>>>> Yeah, the follower should include the partition if the > > > logStartOffset > > > > >>>>> has changed. That should be spelled out on the KIP. Fixed. > > > > >>>>> > > > > >>>>>> 3. In the section "Per-Partition Data", a partition is not > > > considered > > > > >>>>>> dirty if its log start offset has changed. Later in the > section > > > > >>>>> "FetchRequest > > > > >>>>>> Changes", it is said that incremental fetch responses will > > > include a > > > > >>>>>> partition if its logStartOffset has changed. It seems > > > inconsistent. > > > > >>> Can > > > > >>>>>> you update the KIP to clarify it? > > > > >>>>>> > > > > >>>>> In the "Per-Partition Data" section, it does say that > > > logStartOffset > > > > >>>>> changes make a partition dirty, though, right? The first > bullet > > > point > > > > >>>>> is: > > > > >>>>> > > > > >>>>>> * The LogCleaner deletes messages, and this changes the log > start > > > > >>> offset > > > > >>>>> of the partition on the leader., or > > > > >>>>> > > > > >>>> Ah I see. I think I didn't notice this because statement assum= es > > > that the > > > > >>>> LogStartOffset in the leader only changes due to LogCleaner. I= n > > > fact the > > > > >>>> LogStartOffset can change on the leader due to either log > retention > > > and > > > > >>>> DeleteRecordsRequest. I haven't verified whether LogCleaner ca= n > > > change > > > > >>>> LogStartOffset though. It may be a bit better to just say that= a > > > > >>>> partition is considered dirty if LogStartOffset changes. > > > > >>> I agree. It should be straightforward to just resend the > partition > > > if > > > > >>> logStartOffset changes. > > > > >>> > > > > >>>>>> 4. In "Fetch Session Caching" section, it is said that each > broker > > > > >>> has a > > > > >>>>>> limited number of slots. How is this number determined? Does > this > > > > >>> require > > > > >>>>>> a new broker config for this number? > > > > >>>>> Good point. I added two broker configuration parameters to > control > > > > >>> this > > > > >>>>> number. > > > > >>>>> > > > > >>>> I am curious to see whether we can avoid some of these new > configs. > > > For > > > > >>>> example, incremental.fetch.session.cache.slots.per.broker is > > > probably > > > > >>> not > > > > >>>> necessary because if a leader knows that a FetchRequest comes > from a > > > > >>>> follower, we probably want the leader to always cache the > > > information > > > > >>>> from that follower. Does this make sense? > > > > >>> Yeah, maybe we can avoid having > > > > >>> incremental.fetch.session.cache.slots.per.broker. > > > > >>> > > > > >>>> Maybe we can discuss the config later after there is agreement > on > > > how the > > > > >>>> protocol would look like. > > > > >>>> > > > > >>>> > > > > >>>>>> What is the error code if broker does > > > > >>>>>> not have new log for the incoming FetchRequest? > > > > >>>>> Hmm, is there a typo in this question? Maybe you meant to as= k > what > > > > >>>>> happens if there is no new cache slot for the incoming > > > FetchRequest? > > > > >>>>> That's not an error-- the incremental fetch session ID just > gets > > > set to > > > > >>>>> 0, indicating no incremental fetch session was created. > > > > >>>>> > > > > >>>> Yeah there is a typo. You have answered my question. > > > > >>>> > > > > >>>> > > > > >>>>>> 5. Can you clarify what happens if follower adds a partition > to > > > the > > > > >>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? Do= es > > > leader > > > > >>>>>> needs to generate a new session for this ReplicaFetcherThrea= d > or > > > > >>> does it > > > > >>>>> re-use > > > > >>>>>> the existing session? If it uses a new session, is the old > > > session > > > > >>>>>> actively deleted from the slot? > > > > >>>>> The basic idea is that you can't make changes, except by > sending a > > > full > > > > >>>>> fetch request. However, perhaps we can allow the client to > re-use > > > its > > > > >>>>> existing session ID. If the client sets sessionId =3D id, ep= och > =3D > > > 0, it > > > > >>>>> could re-initialize the session. > > > > >>>>> > > > > >>>> Yeah I agree with the basic idea. We probably want to understa= nd > > > more > > > > >>>> detail about how this works later. > > > > >>> Sounds good. I updated the KIP with this information. A > > > > >>> re-initialization should be exactly the same as an > initialization, > > > > >>> except that it reuses an existing ID. > > > > >>> > > > > >>> best, > > > > >>> Colin > > > > >>> > > > > >>> > > > > >>>>>> BTW, I think it may be useful if the KIP can include the > example > > > > >>> workflow > > > > >>>>>> of how this feature will be used in case of partition change > and > > > so > > > > >>> on. > > > > >>>>> Yeah, that might help. > > > > >>>>> > > > > >>>>> best, > > > > >>>>> Colin > > > > >>>>> > > > > >>>>>> Thanks, > > > > >>>>>> Dong > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe< > cmccabe@apache.org > > > > > > > > >>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> I updated the KIP with the ideas we've been discussing. > > > > >>>>>>> > > > > >>>>>>> best, > > > > >>>>>>> Colin > > > > >>>>>>> > > > > >>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > > > > >>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > > > > >>>>>>>>> Hi Colin, thank you for this KIP, it can become a really > > > > >>> useful > > > > >>>>> thing. > > > > >>>>>>>>> I just scanned through the discussion so far and wanted t= o > > > > >>> start a > > > > >>>>>>>>> thread to make as decision about keeping the > > > > >>>>>>>>> cache with the Connection / Session or having some sort o= f > UUID > > > > >>>>> indN > > > > >>>>>>> exed > > > > >>>>>>>>> global Map. > > > > >>>>>>>>> > > > > >>>>>>>>> Sorry if that has been settled already and I missed it. I= n > this > > > > >>>>> case > > > > >>>>>>>>> could anyone point me to the discussion? > > > > >>>>>>>> Hi Jan, > > > > >>>>>>>> > > > > >>>>>>>> I don't think anyone has discussed the idea of tying the > cache > > > > >>> to an > > > > >>>>>>>> individual TCP session yet. I agree that since the cache = is > > > > >>>>> intended to > > > > >>>>>>>> be used only by a single follower or client, it's an > interesting > > > > >>>>> thing > > > > >>>>>>>> to think about. > > > > >>>>>>>> > > > > >>>>>>>> I guess the obvious disadvantage is that whenever your TCP > > > > >>> session > > > > >>>>>>>> drops, you have to make a full fetch request rather than a= n > > > > >>>>> incremental > > > > >>>>>>>> one. It's not clear to me how often this happens in > practice -- > > > > >>> it > > > > >>>>>>>> probably depends a lot on the quality of the network. Fro= m > a > > > > >>> code > > > > >>>>>>>> perspective, it might also be a bit difficult to access da= ta > > > > >>>>> associated > > > > >>>>>>>> with the Session from classes like KafkaApis (although we > could > > > > >>>>> refactor > > > > >>>>>>>> it to make this easier). > > > > >>>>>>>> > > > > >>>>>>>> It's also clear that even if we tie the cache to the > session, we > > > > >>>>> still > > > > >>>>>>>> have to have limits on the number of caches we're willing = to > > > > >>> create. > > > > >>>>>>>> And probably we should reserve some cache slots for each > > > > >>> follower, so > > > > >>>>>>>> that clients don't take all of them. > > > > >>>>>>>> > > > > >>>>>>>>> Id rather see a protocol in which the client is hinting t= he > > > > >>> broker > > > > >>>>>>> that, > > > > >>>>>>>>> he is going to use the feature instead of a client > > > > >>>>>>>>> realizing that the broker just offered the feature > (regardless > > > > >>> of > > > > >>>>>>>>> protocol version which should only indicate that the > feature > > > > >>>>>>>>> would be usable). > > > > >>>>>>>> Hmm. I'm not sure what you mean by "hinting." I do think > that > > > > >>> the > > > > >>>>>>>> server should have the option of not accepting incremental > > > > >>> requests > > > > >>>>> from > > > > >>>>>>>> specific clients, in order to save memory space. > > > > >>>>>>>> > > > > >>>>>>>>> This seems to work better with a per > > > > >>>>>>>>> connection/session attached Metadata than with a Map and > could > > > > >>>>> allow > > > > >>>>>>> for > > > > >>>>>>>>> easier client implementations. > > > > >>>>>>>>> It would also make Client-side code easier as there > wouldn't > > > > >>> be any > > > > >>>>>>>>> Cache-miss error Messages to handle. > > > > >>>>>>>> It is nice not to have to handle cache-miss responses, I > agree. > > > > >>>>>>>> However, TCP sessions aren't exposed to most of our > client-side > > > > >>> code. > > > > >>>>>>>> For example, when the Producer creates a message and hands > it > > > > >>> off to > > > > >>>>> the > > > > >>>>>>>> NetworkClient, the NC will transparently re-connect and > re-send > > > a > > > > >>>>>>>> message if the first send failed. The higher-level code > will > > > > >>> not be > > > > >>>>>>>> informed about whether the TCP session was re-established, > > > > >>> whether an > > > > >>>>>>>> existing TCP session was used, and so on. So overall I > would > > > > >>> still > > > > >>>>> lean > > > > >>>>>>>> towards not coupling this to the TCP session... > > > > >>>>>>>> > > > > >>>>>>>> best, > > > > >>>>>>>> Colin > > > > >>>>>>>> > > > > >>>>>>>>> Thank you again for the KIP. And again, if this was > > > clarified > > > > >>>>> already > > > > >>>>>>>>> please drop me a hint where I could read about it. > > > > >>>>>>>>> > > > > >>>>>>>>> Best Jan > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote: > > > > >>>>>>>>>> Hi all, > > > > >>>>>>>>>> > > > > >>>>>>>>>> I created a KIP to improve the scalability and latency o= f > > > > >>>>>>> FetchRequest: > > > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > >>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > > > >>>>>>> Partition+Scalability > > > > >>>>>>>>>> Please take a look. > > > > >>>>>>>>>> > > > > >>>>>>>>>> cheers, > > > > >>>>>>>>>> Colin > > > > > > > > --089e0828bdf051ee94055f9cc9e4--