Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 00EAA160C05 for ; Wed, 3 Jan 2018 18:37:31 +0100 (CET) Received: (qmail 40194 invoked by uid 500); 3 Jan 2018 17:37:30 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 40183 invoked by uid 99); 3 Jan 2018 17:37:30 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jan 2018 17:37:30 +0000 Received: from auth2-smtp.messagingengine.com (auth2-smtp.messagingengine.com [66.111.4.228]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 45E341A00AE for ; Wed, 3 Jan 2018 17:37:28 +0000 (UTC) Received: from compute2.internal (compute2.nyi.internal [10.202.2.42]) by mailauth.nyi.internal (Postfix) with ESMTP id D1CE5257B2 for ; Wed, 3 Jan 2018 12:37:26 -0500 (EST) Received: from web6 ([10.202.2.216]) by compute2.internal (MEProxy); Wed, 03 Jan 2018 12:37:26 -0500 X-ME-Sender: Received: by mailuser.nyi.internal (Postfix, from userid 99) id B1D744244; Wed, 3 Jan 2018 12:37:26 -0500 (EST) Message-Id: <1515001046.194365.1223135104.48A09424@webmail.messagingengine.com> From: Colin McCabe To: dev@kafka.apache.org MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="utf-8" X-Mailer: MessagingEngine.com Webmail Interface - ajax-cc9a457c Date: Wed, 03 Jan 2018 09:37:26 -0800 Subject: Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability References: <1511298156.455181.1180266296.193D3A00@webmail.messagingengine.com> <1511887086.3498918.1186985376.7F1DEDD6@webmail.messagingengine.com> <1511986396.621620.1188577080.5C0A7DEC@webmail.messagingengine.com> <1512063435.1832330.1189640136.2D1EEEC1@webmail.messagingengine.com> <1512254273.250506.1191933912.13817ECC@webmail.messagingengine.com> <1512334541.920899.1192564144.1A4C5D62@webmail.messagingengine.com> <5A25231B.8050703@trivago.com> <1512457563.3859529.1194299248.56AB43CE@webmail.messagingengine.com> <5A270BF3.8070701@trivago.com> <1512512081.3862143.1195227528.1E8C4CEB@webmail.messagingengine.com> <5A2A5867.7060704@trivago.com> <1512972657.3589565.1200691064.28C18191@webmail.messagingengine.com> <1512972775.3590330.1200716096.2CC2165B@webmail.messagingengine.com> <5A38E6E9.4090805@trivago.com> <1513704405.2780294.1210238600.594BA796@webmail.messagingengine.com> <1514020609.2575904.1214153040.66204A54@webmail.messagingengine.com> <1514410831.77256.1217309856.7D00793D@webmail.messagingengine.com> <1514946842.4146761.1222429528.5A6B8950@webmail.messagingengine.com> In-Reply-To: archived-at: Wed, 03 Jan 2018 17:37:33 -0000 On Tue, Jan 2, 2018, at 23:49, Becket Qin wrote: > Thanks for the reply, Colin. > > My concern for the reinitialization is potential churn rather than > efficiency. The current KIP proposal uses the time and priority based > protection to avoid thrashing, but it is not clear to me if that is > sufficient. For example, consider topic creation/deletion. In those cases, > a lot of the replica fetchers will potentially need to re-establish the > session. And there might be many client session got evicted. And thus again > need to re-establish sessions. This would involve two round trips (due to > InvalidFetchSessionException), potential metadata refresh and backoff. Hi Becket, When a fetcher is adding or removing a partition, it can re-use its existing fetch session. There is no cache churn, and nobody gets evicted, in this case. The fetcher just has to send a full fetch request to establish what it wants the new partition set to be. best, Colin > > Admittedly it is probably not going to be worse than what we have now, but > such uncertain impact still worries me. Are we going to have the follow up > optimization discussion before the implementation of this KIP or are we > going to do it after? In the past we used to have separate KIPs for a > complicated feature but implement them together. Perhaps we can do the same > here if you want to limit the scope of this KIP. > > Thanks, > > Jiangjie (Becket) Qin > > > On Tue, Jan 2, 2018 at 6:34 PM, Colin McCabe wrote: > > > On Tue, Jan 2, 2018, at 04:46, Becket Qin wrote: > > > Hi Colin, > > > > > > Good point about KIP-226. Maybe a separate broker epoch is needed > > although > > > it is a little awkward to let the consumer set this. So was there a > > > solution to the frequent pause and resume scenario? Did I miss something? > > > > > > Thanks, > > > Jiangjie (Becket) Qin > > > > Hi Becket, > > > > Allowing sessions to be re-initialized (as the current KIP does) makes > > frequent pauses and resumes less painful, because the memory associated > > with the old session can be reclaimed. The only cost is sending a full > > fetch request once when the pause or resume is activated. > > > > There are other follow-on optimizations that we might want to do later, > > like allowing new partitions to be added to existing fetch sessions without > > a re-initialization, that could make this even more efficient. But that's > > not in the current KIP, in order to avoid expanding the scope too much. > > > > best, > > Colin > > > > > > > > On Wed, Dec 27, 2017 at 1:40 PM, Colin McCabe > > wrote: > > > > > > > On Sat, Dec 23, 2017, at 09:15, Becket Qin wrote: > > > > > Hi Colin, > > > > > > > > > > Thanks for the explanation. I want to clarify a bit more on my > > thoughts. > > > > > > > > > > I am fine with having a separate discussion as long as the follow-up > > > > > discussion will be incremental on top of this KIP instead of > > override the > > > > > protocol in this KIP. > > > > > > > > Hi Becket, > > > > > > > > Thanks for the clarification. I do think that the changes we've been > > > > discussing would be incremental rather than completely replacing what > > we've > > > > talked about here. See my responses inline below. > > > > > > > > > > > > > > I completely agree this KIP is useful by itself. That being said, we > > want > > > > > to avoid falling into a "local optimal" solution by just saying > > because > > > > it > > > > > solves the problem in this scope. I think we should also think if the > > > > > solution aligns with a "global optimal" (systematic optimal) > > solution as > > > > > well. That is why I brought up other considerations. If they turned > > out > > > > to > > > > > be orthogonal and should be addressed separately, that's good. But at > > > > least > > > > > it is worth thinking about the potential connections between those > > > > things. > > > > > > > > > > One example of such related consideration is the following two > > seemingly > > > > > unrelated things: > > > > > > > > > > 1. I might have missed the discussion, but it seems the concern of > > the > > > > > clients doing frequent pause and resume is still not addressed. Since > > > > this > > > > > is a pretty common use case for applications that want to have flow > > > > > control, or have prioritized consumption, or get consumption > > fairness, we > > > > > probably want to see how to handle this case. One of the solution > > might > > > > be > > > > > a long-lived session id spanning the clients' life time. > > > > > > > > > > 2. KAFKA-6029. The key problem is that the leader wants to know if a > > > > fetch > > > > > request is from a shutting down broker or from a restarted broker. > > > > > > > > > > The connection between those two issues is that both of them could be > > > > > addressed by having a life-long session id for each client (or > > fetcher, > > > > to > > > > > be more accurate). This may indicate that having a life long session > > id > > > > > might be a "global optimal" solution so it should be considered in > > this > > > > > KIP. Otherwise, a follow up KIP discussion for KAFKA-6029 may either > > > > > introduce a broker epoch unnecessarily (which will not be used by the > > > > > consumers at all) or override what we do in this KIP. > > > > > > > > Remember that a given follower will have more than one fetch session > > ID. > > > > Each fetcher thread will have its own session ID. And we will > > eventually > > > > be able to dynamically add or remove fetcher threads using KIP-226. > > > > Therefore, we can't use fetch session IDs to uniquely identify a given > > > > broker incarnation. Any time we increase the number of fetcher > > threads, a > > > > new fetch session ID will show up. > > > > > > > > If we want to know if a fetch request is from a shutting down broker or > > > > from a restarted broker, the most straightforward and robust way would > > > > probably be to add an incarnation number for each broker. ZK can track > > > > this number. This also helps with debugging and logging (you can tell > > > > "aha-- this request came from the second incarnation, not the first." > > > > > > > > > BTW, to clarify, the main purpose of returning the data at the index > > > > > boundary was to get the same benefit of efficient incremental fetch > > for > > > > > both low vol and high vol partitions, which is directly related to > > the > > > > > primary goal in this KIP. The other things (such as avoiding binary > > > > search) > > > > > are just potential additional gain, and they are also brought up to > > see > > > > if > > > > > that could be a "global optimal" solution. > > > > > > > > I still think these are separate. The primary goal of the KIP was to > > make > > > > fetch requests where not all partitions are returning data more > > efficient. > > > > This isn't really related to the goal of trying to make accessing > > > > historical data more efficient. In most cases, the data we're > > accessing is > > > > very recent data, and index lookups are not an issue. > > > > > > > > > > > > > > Some other replies below. > > > > > >In order for improvements to succeed, I think that it's important to > > > > > clearly define the scope and goals. One good example of this was the > > > > > AdminClient KIP. We deliberately avoiding ?>discussing new > > > > administrative > > > > > RPCs in that KIP, in order to limit the scope. This kept the > > discussion > > > > > focused on the user interfaces and configuration, rather than on the > > > > > details of possible >new RPCs. Once the KIP was completed, it was > > easy > > > > for > > > > > us to add new RPCs later in separate KIPs. > > > > > Hmm, why is AdminClient is related? All the discussion are about how > > to > > > > > make fetch more efficient, right? > > > > > > > > > > >Finally, it's not clear that the approach you are proposing is the > > right > > > > > way to go. I think we would need to have a lot more discussion > > about it. > > > > > One very big disadvantage is that it couples >what we send back on > > the > > > > wire > > > > > tightly to what is on the disk. It's not clear that we want to do > > that. > > > > > What if we want to change how things are stored in the future? How > > does > > > > > this work with >clients' own concept of fetch sizes? And so on, and > > so > > > > > on. This needs its own discussion thread. > > > > > That might be true. However, the index file by definition is for the > > > > files > > > > > stored on the disk. So if we decide to change the storage layer to > > > > > something else, it seems natural to use some other suitable ways to > > get > > > > the > > > > > offsets efficiently. > > > > > > > > > > >There are a lot of simpler solutions that might work as well or > > better. > > > > > For example, each partition could keep an in-memory LRU cache of the > > most > > > > > recently used offset to file position >mappings. Or we could have a > > > > thread > > > > > periodically touch the latest page or two of memory in the index > > file for > > > > > each partition, to make sure that it didn't fall out of the cache. > > In > > > > some > > > > > offline >discussions, some of these approaches have looked quite > > > > > promising. I've even seen some good performance numbers for > > prototypes. > > > > > In any case, it's a separate problem which needs its >own KIP, I > > think. > > > > > Those are indeed separate discussions. I was not intended to discuss > > them > > > > > in this KIP. Sorry about the confusion. > > > > > > > > > > Thanks and Merry Christmas, > > > > > > > > Happy new year. Sorry if some of my responses are delayed (I'm on > > > > vacation). > > > > > > > > cheers, > > > > Colin > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > On Sat, Dec 23, 2017 at 1:16 AM, Colin McCabe > > > > wrote: > > > > > > > > > > > On Fri, Dec 22, 2017, at 14:31, Becket Qin wrote: > > > > > > > >> > > > > > > > >> The point I want to make is that avoiding doing binary search > > on > > > > index > > > > > > > >> file and avoid reading the log segments during fetch has some > > > > > > additional > > > > > > > >> benefits. So if the solution works for the current KIP, it > > might > > > > be a > > > > > > > >> better choice. > > > > > > > > > > > > > > >Let's discuss this in a follow-on KIP. > > > > > > > > > > > > > > If the discussion will potentially change the protocol in the > > current > > > > > > > proposal. Would it be better to discuss it now instead of in a > > > > follow-up > > > > > > > KIP so we don't have some protocol that immediately requires a > > > > change. > > > > > > > > > > > > Hi Becket, > > > > > > > > > > > > I think that the problem that you are discussing is different than > > the > > > > > > problem this KIP is designed to address. This KIP is targeted at > > > > > > eliminating the wastefulness of re-transmitting information about > > > > > > partitions that haven't changed in every FetchRequest and > > > > FetchResponse. > > > > > > The problem you are discussing is dealing with situations where the > > > > index > > > > > > file or the data file is not in the page cache, and therefore we > > take a > > > > > > page fault when doing an index lookup. > > > > > > > > > > > > This KIP is useful and valuable on its own. For example, if you > > have > > > > > > brokers in a public cloud in different availability zones, you may > > > > wish to > > > > > > minimize the network traffic between them. Therefore, you don't > > want > > > > every > > > > > > FetchRequest between brokers to be a full FetchRequest. In that > > case, > > > > this > > > > > > KIP is very valuable. > > > > > > > > > > > > In order for improvements to succeed, I think that it's important > > to > > > > > > clearly define the scope and goals. One good example of this was > > the > > > > > > AdminClient KIP. We deliberately avoiding discussing new > > > > administrative > > > > > > RPCs in that KIP, in order to limit the scope. This kept the > > > > discussion > > > > > > focused on the user interfaces and configuration, rather than on > > the > > > > > > details of possible new RPCs. Once the KIP was completed, it was > > easy > > > > for > > > > > > us to add new RPCs later in separate KIPs. > > > > > > > > > > > > While it's clear that there is probably even more we could do to > > > > optimize > > > > > > fetch requests, making them incremental seems like a good first > > cut. I > > > > > > deliberately avoided changing the replication protocol in this KIP, > > > > because > > > > > > I think that it's a big enough change as-is. If we want to change > > the > > > > > > replication protocol in the future, there is nothing preventing > > us... > > > > and > > > > > > this change will be a useful starting point. > > > > > > > > > > > > Finally, it's not clear that the approach you are proposing is the > > > > right > > > > > > way to go. I think we would need to have a lot more discussion > > about > > > > it. > > > > > > One very big disadvantage is that it couples what we send back on > > the > > > > wire > > > > > > tightly to what is on the disk. It's not clear that we want to do > > > > that. > > > > > > What if we want to change how things are stored in the future? How > > > > does > > > > > > this work with clients' own concept of fetch sizes? And so on, > > and so > > > > on. > > > > > > This needs its own discussion thread. > > > > > > > > > > > > There are a lot of simpler solutions that might work as well or > > better. > > > > > > For example, each partition could keep an in-memory LRU cache of > > the > > > > most > > > > > > recently used offset to file position mappings. Or we could have a > > > > thread > > > > > > periodically touch the latest page or two of memory in the index > > file > > > > for > > > > > > each partition, to make sure that it didn't fall out of the > > cache. In > > > > some > > > > > > offline discussions, some of these approaches have looked quite > > > > promising. > > > > > > I've even seen some good performance numbers for prototypes. In > > any > > > > case, > > > > > > it's a separate problem which needs its own KIP, I think. > > > > > > > > > > > > best, > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 19, 2017 at 9:26 AM, Colin McCabe > > > > > > wrote: > > > > > > > > > > > > > > > On Tue, Dec 19, 2017, at 02:16, Jan Filipiak wrote: > > > > > > > > > Sorry for coming back at this so late. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 11.12.2017 07:12, Colin McCabe wrote: > > > > > > > > > > On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote: > > > > > > > > > >> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote: > > > > > > > > > >>> Hi, > > > > > > > > > >>> > > > > > > > > > >>> sorry for the late reply, busy times :-/ > > > > > > > > > >>> > > > > > > > > > >>> I would ask you one thing maybe. Since the timeout > > > > > > > > > >>> argument seems to be settled I have no further argument > > > > > > > > > >>> form your side except the "i don't want to". > > > > > > > > > >>> > > > > > > > > > >>> Can you see that connection.max.idle.max is the exact > > time > > > > > > > > > >>> that expresses "We expect the client to be away for this > > > > long, > > > > > > > > > >>> and come back and continue"? > > > > > > > > > >> Hi Jan, > > > > > > > > > >> > > > > > > > > > >> Sure, connection.max.idle.max is the exact time that we > > want > > > > to > > > > > > keep > > > > > > > > > >> around a TCP session. TCP sessions are relatively cheap, > > so > > > > we > > > > > > can > > > > > > > > > >> afford to keep them around for 10 minutes by default. > > > > Incremental > > > > > > > > fetch > > > > > > > > > >> state is less cheap, so we want to set a shorter timeout > > for > > > > it. > > > > > > We > > > > > > > > > >> also want new TCP sessions to be able to reuse an existing > > > > > > incremental > > > > > > > > > >> fetch session rather than creating a new one and waiting > > for > > > > the > > > > > > old > > > > > > > > one > > > > > > > > > >> to time out. > > > > > > > > > >> > > > > > > > > > >>> also clarified some stuff inline > > > > > > > > > >>> > > > > > > > > > >>> Best Jan > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> On 05.12.2017 23:14, Colin McCabe wrote: > > > > > > > > > >>>> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote: > > > > > > > > > >>>>> Hi Colin > > > > > > > > > >>>>> > > > > > > > > > >>>>> Addressing the topic of how to manage slots from the > > other > > > > > > thread. > > > > > > > > > >>>>> With tcp connections all this comes for free > > essentially. > > > > > > > > > >>>> Hi Jan, > > > > > > > > > >>>> > > > > > > > > > >>>> I don't think that it's accurate to say that cache > > > > management > > > > > > > > "comes for > > > > > > > > > >>>> free" by coupling the incremental fetch session with > > the TCP > > > > > > > > session. > > > > > > > > > >>>> When a new TCP session is started by a fetch request, > > you > > > > still > > > > > > > > have to > > > > > > > > > >>>> decide whether to grant that request an incremental > > fetch > > > > > > session or > > > > > > > > > >>>> not. If your answer is that you always grant the > > request, I > > > > > > would > > > > > > > > argue > > > > > > > > > >>>> that you do not have cache management. > > > > > > > > > >>> First I would say, the client has a big say in this. If > > the > > > > > > client > > > > > > > > > >>> is not going to issue incremental he shouldn't ask for a > > > > cache > > > > > > > > > >>> when the client ask for the cache we still have all > > options > > > > to > > > > > > deny. > > > > > > > > > >> To put it simply, we have to have some cache management > > above > > > > and > > > > > > > > beyond > > > > > > > > > >> just giving out an incremental fetch session to anyone who > > > > has a > > > > > > TCP > > > > > > > > > >> session. Therefore, caching does not become simpler if > > you > > > > > > couple the > > > > > > > > > >> fetch session to the TCP session. > > > > > > > > > Simply giving out an fetch session for everyone with a > > > > connection is > > > > > > too > > > > > > > > > simple, > > > > > > > > > but I think it plays well into the idea of consumers > > choosing to > > > > use > > > > > > the > > > > > > > > > feature > > > > > > > > > therefore only enabling where it brings maximum gains > > > > > > > > > (replicas,MirrorMakers) > > > > > > > > > >> > > > > > > > > > >>>> I guess you could argue that timeouts are cache > > management, > > > > but > > > > > > I > > > > > > > > don't > > > > > > > > > >>>> find that argument persuasive. Anyone could just > > create a > > > > lot > > > > > > of > > > > > > > > TCP > > > > > > > > > >>>> sessions and use a lot of resources, in that case. So > > > > there is > > > > > > > > > >>>> essentially no limit on memory use. In any case, TCP > > > > sessions > > > > > > don't > > > > > > > > > >>>> help us implement fetch session timeouts. > > > > > > > > > >>> We still have all the options denying the request to > > keep the > > > > > > state. > > > > > > > > > >>> What you want seems like a max connections / ip > > safeguard. > > > > > > > > > >>> I can currently take down a broker with to many > > connections > > > > > > easily. > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>>>> I still would argue we disable it by default and make a > > > > flag > > > > > > in the > > > > > > > > > >>>>> broker to ask the leader to maintain the cache while > > > > > > replicating > > > > > > > > and also only > > > > > > > > > >>>>> have it optional in consumers (default to off) so one > > can > > > > turn > > > > > > it > > > > > > > > on > > > > > > > > > >>>>> where it really hurts. MirrorMaker and audit consumers > > > > > > > > prominently. > > > > > > > > > >>>> I agree with Jason's point from earlier in the thread. > > > > Adding > > > > > > extra > > > > > > > > > >>>> configuration knobs that aren't really necessary can > > harm > > > > > > usability. > > > > > > > > > >>>> Certainly asking people to manually turn on a feature > > > > "where it > > > > > > > > really > > > > > > > > > >>>> hurts" seems to fall in that category, when we could > > easily > > > > > > enable > > > > > > > > it > > > > > > > > > >>>> automatically for them. > > > > > > > > > >>> This doesn't make much sense to me. > > > > > > > > > >> There are no tradeoffs to think about from the client's > > point > > > > of > > > > > > view: > > > > > > > > > >> it always wants an incremental fetch session. So there > > is no > > > > > > benefit > > > > > > > > to > > > > > > > > > >> making the clients configure an extra setting. Updating > > and > > > > > > managing > > > > > > > > > >> client configurations is also more difficult than managing > > > > broker > > > > > > > > > >> configurations for most users. > > > > > > > > > >> > > > > > > > > > >>> You also wanted to implement > > > > > > > > > >>> a "turn of in case of bug"-knob. Having the client > > indicate > > > > if > > > > > > the > > > > > > > > > >>> feauture will be used seems reasonable to me., > > > > > > > > > >> True. However, if there is a bug, we could also roll > > back the > > > > > > client, > > > > > > > > > >> so having this configuration knob is not strictly > > required. > > > > > > > > > >> > > > > > > > > > >>>>> Otherwise I left a few remarks in-line, which should > > help > > > > to > > > > > > > > understand > > > > > > > > > >>>>> my view of the situation better > > > > > > > > > >>>>> > > > > > > > > > >>>>> Best Jan > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> On 05.12.2017 08:06, Colin McCabe wrote: > > > > > > > > > >>>>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote: > > > > > > > > > >>>>>>> On 03.12.2017 21:55, Colin McCabe wrote: > > > > > > > > > >>>>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote: > > > > > > > > > >>>>>>>>> Thanks for the explanation, Colin. A few more > > > > questions. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>>> The session epoch is not complex. It's just a > > number > > > > > > which > > > > > > > > increments > > > > > > > > > >>>>>>>>>> on each incremental fetch. The session epoch is > > also > > > > > > useful > > > > > > > > for > > > > > > > > > >>>>>>>>>> debugging-- it allows you to match up requests and > > > > > > responses > > > > > > > > when > > > > > > > > > >>>>>>>>>> looking at log files. > > > > > > > > > >>>>>>>>> Currently each request in Kafka has a correlation > > id to > > > > > > help > > > > > > > > match the > > > > > > > > > >>>>>>>>> requests and responses. Is epoch doing something > > > > > > differently? > > > > > > > > > >>>>>>>> Hi Becket, > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> The correlation ID is used within a single TCP > > session, > > > > to > > > > > > > > uniquely > > > > > > > > > >>>>>>>> associate a request with a response. The > > correlation > > > > ID is > > > > > > not > > > > > > > > unique > > > > > > > > > >>>>>>>> (and has no meaning) outside the context of that > > single > > > > TCP > > > > > > > > session. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Keep in mind, NetworkClient is in charge of TCP > > > > sessions, > > > > > > and > > > > > > > > generally > > > > > > > > > >>>>>>>> tries to hide that information from the upper > > layers of > > > > the > > > > > > > > code. So > > > > > > > > > >>>>>>>> when you submit a request to NetworkClient, you > > don't > > > > know > > > > > > if > > > > > > > > that > > > > > > > > > >>>>>>>> request creates a TCP session, or reuses an existing > > > > one. > > > > > > > > > >>>>>>>>>> Unfortunately, this doesn't work. Imagine the > > client > > > > > > misses > > > > > > > > an > > > > > > > > > >>>>>>>>>> increment fetch response about a partition. And > > then > > > > the > > > > > > > > partition is > > > > > > > > > >>>>>>>>>> never updated after that. The client has no way > > to > > > > know > > > > > > > > about the > > > > > > > > > >>>>>>>>>> partition, since it won't be included in any > > future > > > > > > > > incremental fetch > > > > > > > > > >>>>>>>>>> responses. And there are no offsets to compare, > > > > since the > > > > > > > > partition is > > > > > > > > > >>>>>>>>>> simply omitted from the response. > > > > > > > > > >>>>>>>>> I am curious about in which situation would the > > > > follower > > > > > > miss > > > > > > > > a response > > > > > > > > > >>>>>>>>> of a partition. If the entire FetchResponse is lost > > > > (e.g. > > > > > > > > timeout), the > > > > > > > > > >>>>>>>>> follower would disconnect and retry. That will > > result > > > > in > > > > > > > > sending a full > > > > > > > > > >>>>>>>>> FetchRequest. > > > > > > > > > >>>>>>>> Basically, you are proposing that we rely on TCP for > > > > > > reliable > > > > > > > > delivery > > > > > > > > > >>>>>>>> in a distributed system. That isn't a good idea > > for a > > > > > > bunch of > > > > > > > > > >>>>>>>> different reasons. First of all, TCP timeouts tend > > to > > > > be > > > > > > very > > > > > > > > long. So > > > > > > > > > >>>>>>>> if the TCP session timing out is your error > > detection > > > > > > > > mechanism, you > > > > > > > > > >>>>>>>> have to wait minutes for messages to timeout. Of > > > > course, we > > > > > > > > add a > > > > > > > > > >>>>>>>> timeout on top of that after which we declare the > > > > connection > > > > > > > > bad and > > > > > > > > > >>>>>>>> manually close it. But just because the session is > > > > closed > > > > > > on > > > > > > > > one end > > > > > > > > > >>>>>>>> doesn't mean that the other end knows that it is > > > > closed. So > > > > > > > > the leader > > > > > > > > > >>>>>>>> may have to wait quite a long time before TCP > > decides > > > > that > > > > > > yes, > > > > > > > > > >>>>>>>> connection X from the follower is dead and not > > coming > > > > back, > > > > > > > > even though > > > > > > > > > >>>>>>>> gremlins ate the FIN packet which the follower > > > > attempted to > > > > > > > > translate. > > > > > > > > > >>>>>>>> If the cache state is tied to that TCP session, we > > have > > > > to > > > > > > keep > > > > > > > > that > > > > > > > > > >>>>>>>> cache around for a much longer time than we should. > > > > > > > > > >>>>>>> Hi, > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I see this from a different perspective. The cache > > expiry > > > > > > time > > > > > > > > > >>>>>>> has the same semantic as idle connection time in this > > > > > > scenario. > > > > > > > > > >>>>>>> It is the time range we expect the client to come > > back an > > > > > > reuse > > > > > > > > > >>>>>>> its broker side state. I would argue that on close we > > > > would > > > > > > get > > > > > > > > an > > > > > > > > > >>>>>>> extra shot at cleaning up the session state early. As > > > > > > opposed to > > > > > > > > > >>>>>>> always wait for that duration for expiry to happen. > > > > > > > > > >>>>>> Hi Jan, > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> The idea here is that the incremental fetch cache > > expiry > > > > time > > > > > > can > > > > > > > > be > > > > > > > > > >>>>>> much shorter than the TCP session timeout. In general > > > > the TCP > > > > > > > > session > > > > > > > > > >>>>>> timeout is common to all TCP connections, and very > > long. > > > > To > > > > > > make > > > > > > > > these > > > > > > > > > >>>>>> numbers a little more concrete, the TCP session > > timeout is > > > > > > often > > > > > > > > > >>>>>> configured to be 2 hours on Linux. (See > > > > > > > > > >>>>>> https://www.cyberciti.biz/tips/linux-increasing-or- > > > > > > > > decreasing-tcp-sockets-timeouts.html > > > > > > > > > >>>>>> ) The timeout I was proposing for incremental fetch > > > > sessions > > > > > > was > > > > > > > > one or > > > > > > > > > >>>>>> two minutes at most. > > > > > > > > > >>>>> Currently this is taken care of by > > > > > > > > > >>>>> connections.max.idle.ms on the broker and defaults to > > > > > > something > > > > > > > > of few > > > > > > > > > >>>>> minutes. > > > > > > > > > >>>> It is 10 minutes by default, which is longer than what > > we > > > > want > > > > > > the > > > > > > > > > >>>> incremental fetch session timeout to be. There's no > > reason > > > > to > > > > > > > > couple > > > > > > > > > >>>> these two things. > > > > > > > > > >>>> > > > > > > > > > >>>>> Also something we could let the client change if we > > really > > > > > > wanted > > > > > > > > to. > > > > > > > > > >>>>> So there is no need to worry about coupling our > > > > implementation > > > > > > to > > > > > > > > some > > > > > > > > > >>>>> timeouts given by the OS, with TCP one always has full > > > > control > > > > > > > > over the worst > > > > > > > > > >>>>> times + one gets the extra shot cleaning up early when > > the > > > > > > close > > > > > > > > comes through. > > > > > > > > > >>>>> Which is the majority of the cases. > > > > > > > > > >>>> In the majority of cases, the TCP session will be > > > > > > re-established. > > > > > > > > In > > > > > > > > > >>>> that case, we have to send a full fetch request rather > > than > > > > an > > > > > > > > > >>>> incremental fetch request. > > > > > > > > > >>> I actually have a hard time believing this. Do you have > > any > > > > > > numbers > > > > > > > > of > > > > > > > > > >>> any existing production system? Is it the virtualisation > > > > layer > > > > > > > > cutting > > > > > > > > > >>> all the connections? > > > > > > > > > >>> We see this only on application crashes and restarts > > where > > > > the > > > > > > app > > > > > > > > needs > > > > > > > > > >>> todo the full anyways > > > > > > > > > >>> as it probably continues with stores offsets. > > > > > > > > > >> Yes, TCP connections get dropped. It happens very often > > in > > > > > > production > > > > > > > > > >> clusters, actually. When I was working on Hadoop, one of > > the > > > > most > > > > > > > > > >> common questions I heard from newcomers was "why do I see > > so > > > > many > > > > > > > > > >> EOFException messages in the logs"? The other thing that > > > > happens > > > > > > a > > > > > > > > lot > > > > > > > > > >> is DNS outages or slowness. Public clouds seem to have > > even > > > > more > > > > > > > > > >> unstable networks than the on-premise clusters. I am not > > > > sure why > > > > > > > > that > > > > > > > > > >> is. > > > > > > > > > Hadoop has a wiki page on exactly this > > > > > > > > > https://wiki.apache.org/hadoop/EOFException > > > > > > > > > > > > > > > > > > besides user errors they have servers crashing and actually > > loss > > > > of > > > > > > > > > connection high on their list. > > > > > > > > > In the case of "server goes away" the cache goes with it. So > > > > nothing > > > > > > to > > > > > > > > > argue about the cache beeing reused by > > > > > > > > > a new connection. > > > > > > > > > > > > > > > > > > Can you make an argument at which point the epoch would be > > > > updated > > > > > > > > > broker side to maximise re-usage of the cache on > > > > > > > > > lost connections. In many cases the epoch would go out of > > sync > > > > and we > > > > > > > > > would need a full fetch anyways. Am I mistaken here? > > > > > > > > > > > > > > > > The current proposal is that the server can accept multiple > > > > requests > > > > > > in a > > > > > > > > row with the same sequence number. > > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > >>>>>>>> Secondly, from a software engineering perspective, > > it's > > > > not > > > > > > a > > > > > > > > good idea > > > > > > > > > >>>>>>>> to try to tightly tie together TCP and our code. We > > > > would > > > > > > have > > > > > > > > to > > > > > > > > > >>>>>>>> rework how we interact with NetworkClient so that > > we are > > > > > > aware > > > > > > > > of things > > > > > > > > > >>>>>>>> like TCP sessions closing or opening. We would > > have to > > > > be > > > > > > > > careful > > > > > > > > > >>>>>>>> preserve the ordering of incoming messages when > > doing > > > > things > > > > > > > > like > > > > > > > > > >>>>>>>> putting incoming requests on to a queue to be > > processed > > > > by > > > > > > > > multiple > > > > > > > > > >>>>>>>> threads. It's just a lot of complexity to add, and > > > > there's > > > > > > no > > > > > > > > upside. > > > > > > > > > >>>>>>> I see the point here. And I had a small chat with > > Dong > > > > Lin > > > > > > > > already > > > > > > > > > >>>>>>> making me aware of this. I tried out the approaches > > and > > > > > > propose > > > > > > > > the > > > > > > > > > >>>>>>> following: > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> The client start and does a full fetch. It then does > > > > > > incremental > > > > > > > > fetches. > > > > > > > > > >>>>>>> The connection to the broker dies and is > > re-established > > > > by > > > > > > > > NetworkClient > > > > > > > > > >>>>>>> under the hood. > > > > > > > > > >>>>>>> The broker sees an incremental fetch without having > > > > state => > > > > > > > > returns > > > > > > > > > >>>>>>> error: > > > > > > > > > >>>>>>> Client sees the error, does a full fetch and goes > > back to > > > > > > > > incrementally > > > > > > > > > >>>>>>> fetching. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> having this 1 additional error round trip is > > essentially > > > > the > > > > > > > > same as > > > > > > > > > >>>>>>> when something > > > > > > > > > >>>>>>> with the sessions or epoch changed unexpectedly to > > the > > > > client > > > > > > > > (say > > > > > > > > > >>>>>>> expiry). > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> So its nothing extra added but the conditions are > > easier > > > > to > > > > > > > > evaluate. > > > > > > > > > >>>>>>> Especially since we do everything with NetworkClient. > > > > Other > > > > > > > > implementers > > > > > > > > > >>>>>>> on the > > > > > > > > > >>>>>>> protocol are free to optimizes this and do not do the > > > > > > errornours > > > > > > > > > >>>>>>> roundtrip on the > > > > > > > > > >>>>>>> new connection. > > > > > > > > > >>>>>>> Its a great plus that the client can know when the > > error > > > > is > > > > > > gonna > > > > > > > > > >>>>>>> happen. instead of > > > > > > > > > >>>>>>> the server to always have to report back if something > > > > changes > > > > > > > > > >>>>>>> unexpectedly for the client > > > > > > > > > >>>>>> You are assuming that the leader and the follower > > agree > > > > that > > > > > > the > > > > > > > > TCP > > > > > > > > > >>>>>> session drops at the same time. When there are > > network > > > > > > problems, > > > > > > > > this > > > > > > > > > >>>>>> may not be true. The leader may still think the > > previous > > > > TCP > > > > > > > > session is > > > > > > > > > >>>>>> active. In that case, we have to keep the incremental > > > > fetch > > > > > > > > session > > > > > > > > > >>>>>> state around until we learn otherwise (which could be > > up > > > > to > > > > > > that > > > > > > > > 2 hour > > > > > > > > > >>>>>> timeout I mentioned). And if we get a new incoming > > > > > > incremental > > > > > > > > fetch > > > > > > > > > >>>>>> request, we can't assume that it replaces the previous > > > > one, > > > > > > > > because the > > > > > > > > > >>>>>> IDs will be different (the new one starts a new > > session). > > > > > > > > > >>>>> As mentioned, no reason to fear some time-outs out of > > our > > > > > > control > > > > > > > > > >>>>>>>> Imagine that I made an argument that client IDs are > > > > > > "complex" > > > > > > > > and should > > > > > > > > > >>>>>>>> be removed from our APIs. After all, we can just > > look > > > > at > > > > > > the > > > > > > > > remote IP > > > > > > > > > >>>>>>>> address and TCP port of each connection. Would you > > > > think > > > > > > that > > > > > > > > was a > > > > > > > > > >>>>>>>> good idea? The client ID is useful when looking at > > > > logs. > > > > > > For > > > > > > > > example, > > > > > > > > > >>>>>>>> if a rebalance is having problems, you want to know > > what > > > > > > > > clients were > > > > > > > > > >>>>>>>> having a problem. So having the client ID field to > > > > guide > > > > > > you is > > > > > > > > > >>>>>>>> actually much less "complex" in practice than not > > > > having an > > > > > > ID. > > > > > > > > > >>>>>>> I still cant follow why the correlation idea will not > > > > help > > > > > > here. > > > > > > > > > >>>>>>> Correlating logs with it usually works great. Even > > with > > > > > > > > primitive tools > > > > > > > > > >>>>>>> like grep > > > > > > > > > >>>>>> The correlation ID does help somewhat, but certainly > > not > > > > as > > > > > > much > > > > > > > > as a > > > > > > > > > >>>>>> unique 64-bit ID. The correlation ID is not unique > > in the > > > > > > > > broker, just > > > > > > > > > >>>>>> unique to a single NetworkClient. Simiarly, the > > > > correlation > > > > > > ID > > > > > > > > is not > > > > > > > > > >>>>>> unique on the client side, if there are multiple > > > > Consumers, > > > > > > etc. > > > > > > > > > >>>>> Can always bump entropy in correlation IDs, never had a > > > > problem > > > > > > > > > >>>>> of finding to many duplicates. Would be a different KIP > > > > though. > > > > > > > > > >>>>>>>> Similarly, if metadata responses had epoch numbers > > > > (simple > > > > > > > > incrementing > > > > > > > > > >>>>>>>> numbers), we would not have to debug problems like > > > > clients > > > > > > > > accidentally > > > > > > > > > >>>>>>>> getting old metadata from servers that had been > > > > partitioned > > > > > > off > > > > > > > > from the > > > > > > > > > >>>>>>>> network for a while. Clients would know the > > difference > > > > > > between > > > > > > > > old and > > > > > > > > > >>>>>>>> new metadata. So putting epochs in to the metadata > > > > request > > > > > > is > > > > > > > > much less > > > > > > > > > >>>>>>>> "complex" operationally, even though it's an extra > > > > field in > > > > > > the > > > > > > > > request. > > > > > > > > > >>>>>>>> This has been discussed before on the mailing > > list. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> So I think the bottom line for me is that having the > > > > > > session ID > > > > > > > > and > > > > > > > > > >>>>>>>> session epoch, while it adds two extra fields, > > reduces > > > > > > > > operational > > > > > > > > > >>>>>>>> complexity and increases debuggability. It avoids > > > > tightly > > > > > > > > coupling us > > > > > > > > > >>>>>>>> to assumptions about reliable ordered delivery which > > > > tend > > > > > > to be > > > > > > > > violated > > > > > > > > > >>>>>>>> in practice in multiple layers of the stack. > > Finally, > > > > it > > > > > > > > avoids the > > > > > > > > > >>>>>>>> necessity of refactoring NetworkClient. > > > > > > > > > >>>>>>> So there is stacks out there that violate TCP > > > > guarantees? And > > > > > > > > software > > > > > > > > > >>>>>>> still works? How can this be? Can you elaborate a > > little > > > > > > where > > > > > > > > this > > > > > > > > > >>>>>>> can be violated? I am not very familiar with > > virtualized > > > > > > > > environments > > > > > > > > > >>>>>>> but they can't really violate TCP contracts. > > > > > > > > > >>>>>> TCP's guarantees of reliable, in-order transmission > > > > certainly > > > > > > can > > > > > > > > be > > > > > > > > > >>>>>> violated. For example, I once had to debug a cluster > > > > where a > > > > > > > > certain > > > > > > > > > >>>>>> node had a network card which corrupted its > > transmissions > > > > > > > > occasionally. > > > > > > > > > >>>>>> With all the layers of checksums, you would think that > > > > this > > > > > > was > > > > > > > > not > > > > > > > > > >>>>>> possible, but it happened. We occasionally got > > corrupted > > > > data > > > > > > > > written > > > > > > > > > >>>>>> to disk on the other end because of it. Even more > > > > > > frustrating, > > > > > > > > the data > > > > > > > > > >>>>>> was not corrupted on disk on the sending node-- it > > was a > > > > bug > > > > > > in > > > > > > > > the > > > > > > > > > >>>>>> network card driver that was injecting the errors. > > > > > > > > > >>>>> true, but your broker might aswell read a corrupted > > 600GB > > > > as > > > > > > size > > > > > > > > from > > > > > > > > > >>>>> the network and die with OOM instantly. > > > > > > > > > >>>> If you read 600 GB as the size from the network, you > > will > > > > not > > > > > > "die > > > > > > > > with > > > > > > > > > >>>> OOM instantly." That would be a bug. Instead, you will > > > > notice > > > > > > > > that 600 > > > > > > > > > >>>> GB is greater than max.message.bytes, and close the > > > > connection. > > > > > > > > > >>> We only check max.message.bytes to late to guard against > > > > consumer > > > > > > > > > >>> stalling. > > > > > > > > > >>> we dont have a notion of max.networkpacket.size before we > > > > > > allocate > > > > > > > > the > > > > > > > > > >>> bytebuffer to read it into. > > > > > > > > > >> "network packets" are not the same thing as "kafka > > RPCs." One > > > > > > Kafka > > > > > > > > RPC > > > > > > > > > >> could take up mutiple ethernet packets. > > > > > > > > > >> > > > > > > > > > >> Also, max.message.bytes has nothing to do with "consumer > > > > > > stalling" -- > > > > > > > > > >> you are probably thinking about some of the fetch request > > > > > > > > > >> configurations. max.message.bytes is used by the RPC > > system > > > > to > > > > > > figure > > > > > > > > > >> out whether to read the full incoming RP > > > > > > > > > > Whoops, this is incorrect. I was thinking about > > > > > > > > > > "socket.request.max.bytes" rather than "max.message.bytes." > > > > Sorry > > > > > > > > about > > > > > > > > > > that. See Ismael's email as well. > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > >> best, > > > > > > > > > >> Colin > > > > > > > > > >> > > > > > > > > > >>>>> Optimizing for still having functional > > > > > > > > > >>>>> software under this circumstances is not reasonable. > > > > > > > > > >>>>> You want to get rid of such a > > > > > > > > > >>>>> node ASAP and pray that zookeepers ticks get corrupted > > > > often > > > > > > enough > > > > > > > > > >>>>> that it finally drops out of the cluster. > > > > > > > > > >>>>> > > > > > > > > > >>>>> There is a good reason that these kinda things > > > > > > > > > >>>>> https://issues.apache.org/jira/browse/MESOS-4105 > > > > > > > > > >>>>> don't end up as kafka Jiras. In the end you can't run > > any > > > > > > software > > > > > > > > in > > > > > > > > > >>>>> these containers anymore. Application layer checksums > > are a > > > > > > neat > > > > > > > > thing to > > > > > > > > > >>>>> fail fast but trying to cope with this probably causes > > > > more bad > > > > > > > > than > > > > > > > > > >>>>> good. So I would argue that we shouldn't try this for > > the > > > > > > fetch > > > > > > > > requests. > > > > > > > > > >>>> One of the goals of Apache Kafka is to be "a streaming > > > > > > platform... > > > > > > > > > >>>> [that] lets you store streams of records in a > > fault-tolerant > > > > > > way." > > > > > > > > For > > > > > > > > > >>>> more information, see https://kafka.apache.org/intro . > > > > > > > > Fault-tolerance > > > > > > > > > >>>> is explicitly part of the goal of Kafka. Prayer should > > be > > > > > > > > optional, not > > > > > > > > > >>>> required, when running the software. > > > > > > > > > >>> Yes, we need to fail ASAP when we read corrupted > > packages. It > > > > > > seemed > > > > > > > > > >>> to me like you tried to make the case for pray and try to > > > > stay > > > > > > alive. > > > > > > > > > >>> Fault > > > > > > > > > >>> tolerance here means. I am a fishy box i am going to let > > a > > > > good > > > > > > box > > > > > > > > > >>> handle > > > > > > > > > >>> it and be silent until i get fixed up. > > > > > > > > > >>>> Crashing because someone sent you a bad packet is not > > > > reasonable > > > > > > > > > >>>> behavior. It is a bug. Similarly, bringing down the > > whole > > > > > > cluster, > > > > > > > > > >>>> which could a hundred nodes, because someone had a bad > > > > network > > > > > > > > adapter > > > > > > > > > >>>> is not reasonable behavior. It is perhaps reasonable > > for > > > > the > > > > > > > > cluster to > > > > > > > > > >>>> perform worse when hardware is having problems. But > > that's > > > > a > > > > > > > > different > > > > > > > > > >>>> discussion. > > > > > > > > > >>> See above. > > > > > > > > > >>>> best, > > > > > > > > > >>>> Colin > > > > > > > > > >>>> > > > > > > > > > >>>>>> However, my point was not about TCP's guarantees being > > > > > > violated. > > > > > > > > My > > > > > > > > > >>>>>> point is that TCP's guarantees are only one small > > building > > > > > > block > > > > > > > > to > > > > > > > > > >>>>>> build a robust distributed system. TCP basically just > > > > says > > > > > > that > > > > > > > > if you > > > > > > > > > >>>>>> get any bytes from the stream, you will get the ones > > that > > > > were > > > > > > > > sent by > > > > > > > > > >>>>>> the sender, in the order they were sent. TCP does not > > > > > > guarantee > > > > > > > > that > > > > > > > > > >>>>>> the bytes you send will get there. It does not > > guarantee > > > > > > that if > > > > > > > > you > > > > > > > > > >>>>>> close the connection, the other end will know about > > it in > > > > a > > > > > > timely > > > > > > > > > >>>>>> fashion. > > > > > > > > > >>>>> These are very powerful grantees and since we use TCP > > we > > > > should > > > > > > > > > >>>>> piggy pack everything that is reasonable on to it. IMO > > > > there > > > > > > is no > > > > > > > > > >>>>> need to reimplement correct sequencing again if you get > > > > that > > > > > > from > > > > > > > > > >>>>> your transport layer. It saves you the complexity, it > > makes > > > > > > > > > >>>>> you application behave way more naturally and your api > > > > easier > > > > > > to > > > > > > > > > >>>>> understand. > > > > > > > > > >>>>> > > > > > > > > > >>>>> There is literally nothing the Kernel wont let you > > decide > > > > > > > > > >>>>> especially not any timings. Only noticeable exception > > being > > > > > > > > TIME_WAIT > > > > > > > > > >>>>> of usually 240 seconds but that already has little todo > > > > with > > > > > > the > > > > > > > > broker > > > > > > > > > >>>>> itself and > > > > > > > > > >>>>> if we are running out of usable ports because of this > > then > > > > > > expiring > > > > > > > > > >>>>> fetch requests > > > > > > > > > >>>>> wont help much anyways. > > > > > > > > > >>>>> > > > > > > > > > >>>>> I hope I could strengthen the trust you have in > > userland > > > > TCP > > > > > > > > connection > > > > > > > > > >>>>> management. It is really powerful and can be exploited > > for > > > > > > maximum > > > > > > > > gains > > > > > > > > > >>>>> without much risk in my opinion. > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>>> It does not guarantee that the bytes will be received > > in a > > > > > > > > > >>>>>> certain timeframe, and certainly doesn't guarantee > > that > > > > if you > > > > > > > > send a > > > > > > > > > >>>>>> byte on connection X and then on connection Y, that > > the > > > > remote > > > > > > > > end will > > > > > > > > > >>>>>> read a byte on X before reading a byte on Y. > > > > > > > > > >>>>> Noone expects this from two independent paths of any > > kind. > > > > > > > > > >>>>> > > > > > > > > > >>>>>> best, > > > > > > > > > >>>>>> Colin > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> Hope this made my view clearer, especially the first > > > > part. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Best Jan > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>> best, > > > > > > > > > >>>>>>>> Colin > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>>> If there is an error such as NotLeaderForPartition > > is > > > > > > > > > >>>>>>>>> returned for some partitions, the follower can > > always > > > > send > > > > > > a > > > > > > > > full > > > > > > > > > >>>>>>>>> FetchRequest. Is there a scenario that only some > > of the > > > > > > > > partitions in a > > > > > > > > > >>>>>>>>> FetchResponse is lost? > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> Thanks, > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> Jiangjie (Becket) Qin > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe< > > > > > > > > cmccabe@apache.org> wrote: > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > > > > > > > > > >>>>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe< > > > > > > > > cmccabe@apache.org> > > > > > > > > > >>>>>>>>>> wrote: > > > > > > > > > >>>>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > > > > > > > > >>>>>>>>>>>>> Hey Colin, > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> Thanks much for the update. I have a few > > questions > > > > > > below: > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> 1. I am not very sure that we need Fetch > > Session > > > > > > Epoch. It > > > > > > > > seems that > > > > > > > > > >>>>>>>>>>>>> Fetch > > > > > > > > > >>>>>>>>>>>>> Session Epoch is only needed to help leader > > > > distinguish > > > > > > > > between "a > > > > > > > > > >>>>>>>>>> full > > > > > > > > > >>>>>>>>>>>>> fetch request" and "a full fetch request and > > > > request a > > > > > > new > > > > > > > > > >>>>>>>>>> incremental > > > > > > > > > >>>>>>>>>>>>> fetch session". Alternatively, follower can > > also > > > > > > indicate > > > > > > > > "a full > > > > > > > > > >>>>>>>>>> fetch > > > > > > > > > >>>>>>>>>>>>> request and request a new incremental fetch > > > > session" by > > > > > > > > setting Fetch > > > > > > > > > >>>>>>>>>>>>> Session ID to -1 without using Fetch Session > > Epoch. > > > > > > Does > > > > > > > > this make > > > > > > > > > >>>>>>>>>> sense? > > > > > > > > > >>>>>>>>>>>> Hi Dong, > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> The fetch session epoch is very important for > > > > ensuring > > > > > > > > correctness. It > > > > > > > > > >>>>>>>>>>>> prevents corrupted or incomplete fetch data due > > to > > > > > > network > > > > > > > > reordering > > > > > > > > > >>>>>>>>>> or > > > > > > > > > >>>>>>>>>>>> loss. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> For example, consider a scenario where the > > follower > > > > > > sends a > > > > > > > > fetch > > > > > > > > > >>>>>>>>>>>> request to the leader. The leader responds, > > but the > > > > > > > > response is lost > > > > > > > > > >>>>>>>>>>>> because of network problems which affected the > > TCP > > > > > > > > session. In that > > > > > > > > > >>>>>>>>>>>> case, the follower must establish a new TCP > > session > > > > and > > > > > > > > re-send the > > > > > > > > > >>>>>>>>>>>> incremental fetch request. But the leader does > > not > > > > know > > > > > > > > that the > > > > > > > > > >>>>>>>>>>>> follower didn't receive the previous incremental > > > > fetch > > > > > > > > response. It is > > > > > > > > > >>>>>>>>>>>> only the incremental fetch epoch which lets the > > > > leader > > > > > > know > > > > > > > > that it > > > > > > > > > >>>>>>>>>>>> needs to resend that data, and not data which > > comes > > > > > > > > afterwards. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> You could construct similar scenarios with > > message > > > > > > > > reordering, > > > > > > > > > >>>>>>>>>>>> duplication, etc. Basically, this is a stateful > > > > > > protocol > > > > > > > > on an > > > > > > > > > >>>>>>>>>>>> unreliable network, and you need to know > > whether the > > > > > > > > follower got the > > > > > > > > > >>>>>>>>>>>> previous data you sent before you move on. And > > you > > > > > > need to > > > > > > > > handle > > > > > > > > > >>>>>>>>>>>> issues like duplicated or delayed requests. > > These > > > > > > issues > > > > > > > > do not affect > > > > > > > > > >>>>>>>>>>>> the full fetch request, because it is not > > > > stateful-- any > > > > > > > > full fetch > > > > > > > > > >>>>>>>>>>>> request can be understood and properly > > responded to > > > > in > > > > > > > > isolation. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Thanks for the explanation. This makes sense. On > > the > > > > > > other > > > > > > > > hand I would > > > > > > > > > >>>>>>>>>>> be interested in learning more about whether > > Becket's > > > > > > > > solution can help > > > > > > > > > >>>>>>>>>>> simplify the protocol by not having the echo > > field > > > > and > > > > > > > > whether that is > > > > > > > > > >>>>>>>>>>> worth doing. > > > > > > > > > >>>>>>>>>> Hi Dong, > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> I commented about this in the other thread. A > > > > solution > > > > > > which > > > > > > > > doesn't > > > > > > > > > >>>>>>>>>> maintain session information doesn't work here. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> 2. It is said that Incremental FetchRequest > > will > > > > > > include > > > > > > > > partitions > > > > > > > > > >>>>>>>>>> whose > > > > > > > > > >>>>>>>>>>>>> fetch offset or maximum number of fetch bytes > > has > > > > been > > > > > > > > changed. If > > > > > > > > > >>>>>>>>>>>>> follower's logStartOffet of a partition has > > > > changed, > > > > > > > > should this > > > > > > > > > >>>>>>>>>>>>> partition also be included in the next > > > > FetchRequest to > > > > > > the > > > > > > > > leader? > > > > > > > > > >>>>>>>>>>>> Otherwise, it > > > > > > > > > >>>>>>>>>>>>> may affect the handling of DeleteRecordsRequest > > > > because > > > > > > > > leader may > > > > > > > > > >>>>>>>>>> not > > > > > > > > > >>>>>>>>>>>> know > > > > > > > > > >>>>>>>>>>>>> the corresponding data has been deleted on the > > > > > > follower. > > > > > > > > > >>>>>>>>>>>> Yeah, the follower should include the partition > > if > > > > the > > > > > > > > logStartOffset > > > > > > > > > >>>>>>>>>>>> has changed. That should be spelled out on the > > KIP. > > > > > > Fixed. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> 3. In the section "Per-Partition Data", a > > > > partition is > > > > > > not > > > > > > > > considered > > > > > > > > > >>>>>>>>>>>>> dirty if its log start offset has changed. > > Later > > > > in the > > > > > > > > section > > > > > > > > > >>>>>>>>>>>> "FetchRequest > > > > > > > > > >>>>>>>>>>>>> Changes", it is said that incremental fetch > > > > responses > > > > > > will > > > > > > > > include a > > > > > > > > > >>>>>>>>>>>>> partition if its logStartOffset has changed. It > > > > seems > > > > > > > > inconsistent. > > > > > > > > > >>>>>>>>>> Can > > > > > > > > > >>>>>>>>>>>>> you update the KIP to clarify it? > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> In the "Per-Partition Data" section, it does say > > > > that > > > > > > > > logStartOffset > > > > > > > > > >>>>>>>>>>>> changes make a partition dirty, though, right? > > The > > > > > > first > > > > > > > > bullet point > > > > > > > > > >>>>>>>>>>>> is: > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> * The LogCleaner deletes messages, and this > > > > changes the > > > > > > > > log start > > > > > > > > > >>>>>>>>>> offset > > > > > > > > > >>>>>>>>>>>> of the partition on the leader., or > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Ah I see. I think I didn't notice this because > > > > statement > > > > > > > > assumes that the > > > > > > > > > >>>>>>>>>>> LogStartOffset in the leader only changes due to > > > > > > LogCleaner. > > > > > > > > In fact the > > > > > > > > > >>>>>>>>>>> LogStartOffset can change on the leader due to > > > > either log > > > > > > > > retention and > > > > > > > > > >>>>>>>>>>> DeleteRecordsRequest. I haven't verified whether > > > > > > LogCleaner > > > > > > > > can change > > > > > > > > > >>>>>>>>>>> LogStartOffset though. It may be a bit better to > > > > just say > > > > > > > > that a > > > > > > > > > >>>>>>>>>>> partition is considered dirty if LogStartOffset > > > > changes. > > > > > > > > > >>>>>>>>>> I agree. It should be straightforward to just > > resend > > > > the > > > > > > > > partition if > > > > > > > > > >>>>>>>>>> logStartOffset changes. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> 4. In "Fetch Session Caching" section, it is > > said > > > > that > > > > > > > > each broker > > > > > > > > > >>>>>>>>>> has a > > > > > > > > > >>>>>>>>>>>>> limited number of slots. How is this number > > > > determined? > > > > > > > > Does this > > > > > > > > > >>>>>>>>>> require > > > > > > > > > >>>>>>>>>>>>> a new broker config for this number? > > > > > > > > > >>>>>>>>>>>> Good point. I added two broker configuration > > > > > > parameters to > > > > > > > > control > > > > > > > > > >>>>>>>>>> this > > > > > > > > > >>>>>>>>>>>> number. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> I am curious to see whether we can avoid some of > > > > these > > > > > > new > > > > > > > > configs. For > > > > > > > > > >>>>>>>>>>> example, incremental.fetch.session. > > > > > > cache.slots.per.broker > > > > > > > > is probably > > > > > > > > > >>>>>>>>>> not > > > > > > > > > >>>>>>>>>>> necessary because if a leader knows that a > > > > FetchRequest > > > > > > > > comes from a > > > > > > > > > >>>>>>>>>>> follower, we probably want the leader to always > > > > cache the > > > > > > > > information > > > > > > > > > >>>>>>>>>>> from that follower. Does this make sense? > > > > > > > > > >>>>>>>>>> Yeah, maybe we can avoid having > > > > > > > > > >>>>>>>>>> incremental.fetch.session.cache.slots.per.broker. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Maybe we can discuss the config later after > > there is > > > > > > > > agreement on how the > > > > > > > > > >>>>>>>>>>> protocol would look like. > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> What is the error code if broker does > > > > > > > > > >>>>>>>>>>>>> not have new log for the incoming FetchRequest? > > > > > > > > > >>>>>>>>>>>> Hmm, is there a typo in this question? Maybe > > you > > > > meant > > > > > > to > > > > > > > > ask what > > > > > > > > > >>>>>>>>>>>> happens if there is no new cache slot for the > > > > incoming > > > > > > > > FetchRequest? > > > > > > > > > >>>>>>>>>>>> That's not an error-- the incremental fetch > > session > > > > ID > > > > > > just > > > > > > > > gets set to > > > > > > > > > >>>>>>>>>>>> 0, indicating no incremental fetch session was > > > > created. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Yeah there is a typo. You have answered my > > question. > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> 5. Can you clarify what happens if follower > > adds a > > > > > > > > partition to the > > > > > > > > > >>>>>>>>>>>>> ReplicaFetcherThread after receiving > > > > > > LeaderAndIsrRequest? > > > > > > > > Does leader > > > > > > > > > >>>>>>>>>>>>> needs to generate a new session for this > > > > > > > > ReplicaFetcherThread or > > > > > > > > > >>>>>>>>>> does it > > > > > > > > > >>>>>>>>>>>> re-use > > > > > > > > > >>>>>>>>>>>>> the existing session? If it uses a new > > session, > > > > is the > > > > > > > > old session > > > > > > > > > >>>>>>>>>>>>> actively deleted from the slot? > > > > > > > > > >>>>>>>>>>>> The basic idea is that you can't make changes, > > > > except by > > > > > > > > sending a full > > > > > > > > > >>>>>>>>>>>> fetch request. However, perhaps we can allow > > the > > > > > > client to > > > > > > > > re-use its > > > > > > > > > >>>>>>>>>>>> existing session ID. If the client sets > > sessionId > > > > = id, > > > > > > > > epoch = 0, it > > > > > > > > > >>>>>>>>>>>> could re-initialize the session. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Yeah I agree with the basic idea. We probably > > want to > > > > > > > > understand more > > > > > > > > > >>>>>>>>>>> detail about how this works later. > > > > > > > > > >>>>>>>>>> Sounds good. I updated the KIP with this > > > > information. A > > > > > > > > > >>>>>>>>>> re-initialization should be exactly the same as an > > > > > > > > initialization, > > > > > > > > > >>>>>>>>>> except that it reuses an existing ID. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> best, > > > > > > > > > >>>>>>>>>> Colin > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> BTW, I think it may be useful if the KIP can > > > > include > > > > > > the > > > > > > > > example > > > > > > > > > >>>>>>>>>> workflow > > > > > > > > > >>>>>>>>>>>>> of how this feature will be used in case of > > > > partition > > > > > > > > change and so > > > > > > > > > >>>>>>>>>> on. > > > > > > > > > >>>>>>>>>>>> Yeah, that might help. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> best, > > > > > > > > > >>>>>>>>>>>> Colin > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> Thanks, > > > > > > > > > >>>>>>>>>>>>> Dong > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe< > > > > > > > > cmccabe@apache.org> > > > > > > > > > >>>>>>>>>>>>> wrote: > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> I updated the KIP with the ideas we've been > > > > > > discussing. > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> best, > > > > > > > > > >>>>>>>>>>>>>> Colin > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe > > > > wrote: > > > > > > > > > >>>>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak > > > > wrote: > > > > > > > > > >>>>>>>>>>>>>>>> Hi Colin, thank you for this KIP, it can > > > > become a > > > > > > > > really > > > > > > > > > >>>>>>>>>> useful > > > > > > > > > >>>>>>>>>>>> thing. > > > > > > > > > >>>>>>>>>>>>>>>> I just scanned through the discussion so > > far and > > > > > > wanted > > > > > > > > to > > > > > > > > > >>>>>>>>>> start a > > > > > > > > > >>>>>>>>>>>>>>>> thread to make as decision about keeping the > > > > > > > > > >>>>>>>>>>>>>>>> cache with the Connection / Session or > > having > > > > some > > > > > > sort > > > > > > > > of UUID > > > > > > > > > >>>>>>>>>>>> indN > > > > > > > > > >>>>>>>>>>>>>> exed > > > > > > > > > >>>>>>>>>>>>>>>> global Map. > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> Sorry if that has been settled already and I > > > > missed > > > > > > it. > > > > > > > > In this > > > > > > > > > >>>>>>>>>>>> case > > > > > > > > > >>>>>>>>>>>>>>>> could anyone point me to the discussion? > > > > > > > > > >>>>>>>>>>>>>>> Hi Jan, > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> I don't think anyone has discussed the idea > > of > > > > tying > > > > > > the > > > > > > > > cache > > > > > > > > > >>>>>>>>>> to an > > > > > > > > > >>>>>>>>>>>>>>> individual TCP session yet. I agree that > > since > > > > the > > > > > > > > cache is > > > > > > > > > >>>>>>>>>>>> intended to > > > > > > > > > >>>>>>>>>>>>>>> be used only by a single follower or client, > > > > it's an > > > > > > > > interesting > > > > > > > > > >>>>>>>>>>>> thing > > > > > > > > > >>>>>>>>>>>>>>> to think about. > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> I guess the obvious disadvantage is that > > whenever > > > > > > your > > > > > > > > TCP > > > > > > > > > >>>>>>>>>> session > > > > > > > > > >>>>>>>>>>>>>>> drops, you have to make a full fetch request > > > > rather > > > > > > than > > > > > > > > an > > > > > > > > > >>>>>>>>>>>> incremental > > > > > > > > > >>>>>>>>>>>>>>> one. It's not clear to me how often this > > > > happens in > > > > > > > > practice -- > > > > > > > > > >>>>>>>>>> it > > > > > > > > > >>>>>>>>>>>>>>> probably depends a lot on the quality of the > > > > network. > > > > > > > > From a > > > > > > > > > >>>>>>>>>> code > > > > > > > > > >>>>>>>>>>>>>>> perspective, it might also be a bit > > difficult to > > > > > > access > > > > > > > > data > > > > > > > > > >>>>>>>>>>>> associated > > > > > > > > > >>>>>>>>>>>>>>> with the Session from classes like KafkaApis > > > > > > (although > > > > > > > > we could > > > > > > > > > >>>>>>>>>>>> refactor > > > > > > > > > >>>>>>>>>>>>>>> it to make this easier). > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> It's also clear that even if we tie the > > cache to > > > > the > > > > > > > > session, we > > > > > > > > > >>>>>>>>>>>> still > > > > > > > > > >>>>>>>>>>>>>>> have to have limits on the number of caches > > we're > > > > > > > > willing to > > > > > > > > > >>>>>>>>>> create. > > > > > > > > > >>>>>>>>>>>>>>> And probably we should reserve some cache > > slots > > > > for > > > > > > each > > > > > > > > > >>>>>>>>>> follower, so > > > > > > > > > >>>>>>>>>>>>>>> that clients don't take all of them. > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> Id rather see a protocol in which the > > client is > > > > > > hinting > > > > > > > > the > > > > > > > > > >>>>>>>>>> broker > > > > > > > > > >>>>>>>>>>>>>> that, > > > > > > > > > >>>>>>>>>>>>>>>> he is going to use the feature instead of a > > > > client > > > > > > > > > >>>>>>>>>>>>>>>> realizing that the broker just offered the > > > > feature > > > > > > > > (regardless > > > > > > > > > >>>>>>>>>> of > > > > > > > > > >>>>>>>>>>>>>>>> protocol version which should only indicate > > > > that the > > > > > > > > feature > > > > > > > > > >>>>>>>>>>>>>>>> would be usable). > > > > > > > > > >>>>>>>>>>>>>>> Hmm. I'm not sure what you mean by > > "hinting." > > > > I do > > > > > > > > think that > > > > > > > > > >>>>>>>>>> the > > > > > > > > > >>>>>>>>>>>>>>> server should have the option of not > > accepting > > > > > > > > incremental > > > > > > > > > >>>>>>>>>> requests > > > > > > > > > >>>>>>>>>>>> from > > > > > > > > > >>>>>>>>>>>>>>> specific clients, in order to save memory > > space. > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> This seems to work better with a per > > > > > > > > > >>>>>>>>>>>>>>>> connection/session attached Metadata than > > with > > > > a Map > > > > > > > > and could > > > > > > > > > >>>>>>>>>>>> allow > > > > > > > > > >>>>>>>>>>>>>> for > > > > > > > > > >>>>>>>>>>>>>>>> easier client implementations. > > > > > > > > > >>>>>>>>>>>>>>>> It would also make Client-side code easier > > as > > > > there > > > > > > > > wouldn't > > > > > > > > > >>>>>>>>>> be any > > > > > > > > > >>>>>>>>>>>>>>>> Cache-miss error Messages to handle. > > > > > > > > > >>>>>>>>>>>>>>> It is nice not to have to handle cache-miss > > > > > > responses, I > > > > > > > > agree. > > > > > > > > > >>>>>>>>>>>>>>> However, TCP sessions aren't exposed to most > > of > > > > our > > > > > > > > client-side > > > > > > > > > >>>>>>>>>> code. > > > > > > > > > >>>>>>>>>>>>>>> For example, when the Producer creates a > > message > > > > and > > > > > > > > hands it > > > > > > > > > >>>>>>>>>> off to > > > > > > > > > >>>>>>>>>>>> the > > > > > > > > > >>>>>>>>>>>>>>> NetworkClient, the NC will transparently > > > > re-connect > > > > > > and > > > > > > > > re-send a > > > > > > > > > >>>>>>>>>>>>>>> message if the first send failed. The > > > > higher-level > > > > > > code > > > > > > > > will > > > > > > > > > >>>>>>>>>> not be > > > > > > > > > >>>>>>>>>>>>>>> informed about whether the TCP session was > > > > > > > > re-established, > > > > > > > > > >>>>>>>>>> whether an > > > > > > > > > >>>>>>>>>>>>>>> existing TCP session was used, and so on. So > > > > > > overall I > > > > > > > > would > > > > > > > > > >>>>>>>>>> still > > > > > > > > > >>>>>>>>>>>> lean > > > > > > > > > >>>>>>>>>>>>>>> towards not coupling this to the TCP > > session... > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> best, > > > > > > > > > >>>>>>>>>>>>>>> Colin > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> Thank you again for the KIP. And > > again, if > > > > > > this > > > > > > > > was clarified > > > > > > > > > >>>>>>>>>>>> already > > > > > > > > > >>>>>>>>>>>>>>>> please drop me a hint where I could read > > about > > > > it. > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> Best Jan > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote: > > > > > > > > > >>>>>>>>>>>>>>>>> Hi all, > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> I created a KIP to improve the scalability > > and > > > > > > latency > > > > > > > > of > > > > > > > > > >>>>>>>>>>>>>> FetchRequest: > > > > > > > > > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/ > > > > > > confluence/display/KAFKA/KIP- > > > > > > > > > >>>>>>>>>>>>>> 227%3A+Introduce+Incremental+ > > > > > > FetchRequests+to+Increase+ > > > > > > > > > >>>>>>>>>>>>>> Partition+Scalability > > > > > > > > > >>>>>>>>>>>>>>>>> Please take a look. > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> cheers, > > > > > > > > > >>>>>>>>>>>>>>>>> Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >