Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 04511200D5E for ; Sat, 9 Dec 2017 01:56:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 02CEC160C1F; Sat, 9 Dec 2017 00:56:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 24C71160BFD for ; Sat, 9 Dec 2017 01:56:56 +0100 (CET) Received: (qmail 76515 invoked by uid 500); 9 Dec 2017 00:56:55 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 76503 invoked by uid 99); 9 Dec 2017 00:56:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Dec 2017 00:56:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id F18FCC1493 for ; Sat, 9 Dec 2017 00:56:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.979 X-Spam-Level: * X-Spam-Status: No, score=1.979 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id hynoUjnCxMZU for ; Sat, 9 Dec 2017 00:56:47 +0000 (UTC) Received: from mail-qt0-f175.google.com (mail-qt0-f175.google.com [209.85.216.175]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 389675F3CE for ; Sat, 9 Dec 2017 00:56:46 +0000 (UTC) Received: by mail-qt0-f175.google.com with SMTP id g10so28885804qtj.12 for ; Fri, 08 Dec 2017 16:56:46 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=CiWfCGK2Xknck8pxNGkNdOzRrazxjbNmyJBxxqEaxEg=; b=ScXqvt2jfFf/cGmLe6boh2UU26CdFxoa3sAMllmyzHG90eJR+DD1+Sxi539HHZBOVu 6mNsYoRjjbd1jF9J/P6jaVGZ/+FrsBsF5i+9akqRec0vdafoh+kmhVFymUsGe2XU4Vlf Lv51cx27zZVqp7bDecxfEJXouUwAdFnsnDl39duRQ/qzNSp2eRMz/YbsWZ1JehebF2Br hi1LVLhuG91ewmMkbjv/a4H1Gm+56mNDqRBVlZCMPRC1FYkgG1+sRcgGhyMuXG8IICdT UHzTx1H/rBiJ5hRGlhGt6QxwXVdr7LsKLlZm3y24Ybe9kDsKJDHooKudQwlmGF/41L93 8aCw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=CiWfCGK2Xknck8pxNGkNdOzRrazxjbNmyJBxxqEaxEg=; b=ezcyap96ut5hDJspQSwdlYI/UfAddCr0PrDxo6ujvxSaZgVOG1dUyLA/mBWNR2Ig4h IN0c/pVP9RjY+RmcVOmGwpR7dRobnem6pd5B8MHoRhY9xwJlfoHSnu4S/9eIE62ApSdF jAuYOC99gkqs79/hrEPaGOb2nz7Ch+dh3rBQE51PhlFHFZ6dQdLBNxAz4Mpukag9F/Up j4Vo2eXLL3+GhVkwiT4Gf2Zfuxae76b5z/iAx6JbKGNoZqF5AcQbaxBnp8j4y/3HsSrl zs8T47f+P3/5JIgzBcni8PM4xNqjs47nKSdocvtcfLpFBfwjurChoBxqtImwyZ5MDehs 9Tbw== X-Gm-Message-State: AKGB3mKwuhGW/XnXxtrxIH9kDCiayyRx9Sbxrl3MA5W2zHQ0NrdFB2JO vsgLLnM7Y7R8XclDkiJ1QobNWK7b434fwKp/qf4gOtwD X-Google-Smtp-Source: AGs4zMZZnh10rt+tyEaACrCoAh70JQaCsrbUCgJ/5Hsa0MtcWvGnfFswbSQHhWJ4COuIfyh18VviBkCbkWXk7wg1Tk0= X-Received: by 10.237.37.89 with SMTP id w25mr18393476qtc.160.1512781004603; Fri, 08 Dec 2017 16:56:44 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.146.151 with HTTP; Fri, 8 Dec 2017 16:56:43 -0800 (PST) In-Reply-To: References: <1511298156.455181.1180266296.193D3A00@webmail.messagingengine.com> <5A1D027C.7020206@trivago.com> <1511887086.3498918.1186985376.7F1DEDD6@webmail.messagingengine.com> <1511986396.621620.1188577080.5C0A7DEC@webmail.messagingengine.com> <1512063435.1832330.1189640136.2D1EEEC1@webmail.messagingengine.com> <1512254273.250506.1191933912.13817ECC@webmail.messagingengine.com> <1512334541.920899.1192564144.1A4C5D62@webmail.messagingengine.com> <5A25231B.8050703@trivago.com> <1512457563.3859529.1194299248.56AB43CE@webmail.messagingengine.com> <5A270BF3.8070701@trivago.com> <1512512081.3862143.1195227528.1E8C4CEB@webmail.messagingengine.com> From: Jun Rao Date: Fri, 8 Dec 2017 16:56:43 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability To: "dev@kafka.apache.org" Content-Type: multipart/alternative; boundary="001a113f4a7a671dfe055fddc6fe" archived-at: Sat, 09 Dec 2017 00:56:59 -0000 --001a113f4a7a671dfe055fddc6fe Content-Type: text/plain; charset="UTF-8" Hi, Jiangjie, What I described is almost the same as yours. The only extra thing is to scan the log segment from the identified index entry a bit more to find a file position that ends at a message set boundary and is less than the partition level fetch size. This way, we still preserve the current semantic of not returning more bytes than fetch size unless there is a single message set larger than the fetch size. In a typically cluster at LinkedIn, what's the percentage of idle partitions? Thanks, Jun On Wed, Dec 6, 2017 at 6:57 PM, Becket Qin wrote: > Hi Jun, > > Yes, we still need to handle the corner case. And you are right, it is all > about trade-off between simplicity and the performance gain. > > I was thinking that the brokers always return at least > log.index.interval.bytes per partition to the consumer, just like we will > return at least one message to the user. This way we don't need to worry > about the case that the fetch size is smaller than the index interval. We > may just need to let users know this behavior change. > > Not sure if I completely understand your solution, but I think we are > thinking about the same. i.e. for the first fetch asking for offset x0, we > will need to do a binary search to find the position p0. and then the > broker will iterate over the index entries starting from the first index > entry whose offset is greater than p0 until it reaches the index entry(x1, > p1) so that p1 - p0 is just under the fetch size, but the next entry will > exceed the fetch size. We then return the bytes from p0 to p1. Meanwhile > the broker caches the next fetch (x1, p1). So when the next fetch comes, it > will just iterate over the offset index entry starting at (x1, p1). > > It is true that in the above approach, the log compacted topic needs to be > handled. It seems that this can be solved by checking whether the cached > index and the new log index are still the same index object. If they are > not the same, we can fall back to binary search with the cached offset. It > is admittedly more complicated than the current logic, but given the binary > search logic already exists, it seems the additional object sanity check is > not too much work. > > Not sure if the above implementation is simple enough to justify the > performance improvement. Let me know if you see potential complexity. > > Thanks, > > Jiangjie (Becket) Qin > > > > > > On Wed, Dec 6, 2017 at 4:48 PM, Jun Rao wrote: > > > Hi, Becket, > > > > Yes, I agree that it's rare to have the fetch size smaller than index > > interval. It's just that we still need additional code to handle the rare > > case. > > > > If you go this far, a more general approach (i.e., without returning at > the > > index boundary) is the following. We can cache the following metadata for > > the next fetch offset: the file position in the log segment, the first > > index slot at or after the file position. When serving a fetch request, > we > > scan the index entries from the cached index slot until we hit the fetch > > size. We can then send the data at the message set boundary and update > the > > cached metadata for the next fetch offset. This is kind of complicated, > but > > probably not more than your approach if the corner case has to be > handled. > > > > In both the above approach and your approach, we need the additional > logic > > to handle compacted topic since a log segment (and therefore its index) > can > > be replaced between two consecutive fetch requests. > > > > Overall, I agree that the general approach that you proposed applies more > > widely since we get the benefit even when all topics are high volume. > It's > > just that it would be better if we could think of a simpler > implementation. > > > > Thanks, > > > > Jun > > > > On Tue, Dec 5, 2017 at 9:38 PM, Becket Qin wrote: > > > > > Hi Jun, > > > > > > That is true, but in reality it seems rare that the fetch size is > smaller > > > than index interval. In the worst case, we may need to do another look > > up. > > > In the future, when we have the mechanism to inform the clients about > the > > > broker configurations, the clients may want to configure > correspondingly > > as > > > well, e.g. max message size, max timestamp difference, etc. > > > > > > On the other hand, we are not guaranteeing that the returned bytes in a > > > partition is always bounded by the per partition fetch size, because we > > are > > > going to return at least one message, so the per partition fetch size > > seems > > > already a soft limit. Since we are introducing a new fetch protocol and > > > this is related, it might be worth considering this option. > > > > > > BTW, one reason I bring this up again was because yesterday we had a > > > presentation from Uber regarding the end to end latency. And they are > > > seeing this binary search behavior impacting the latency due to page > > in/out > > > of the index file. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Tue, Dec 5, 2017 at 5:55 PM, Jun Rao wrote: > > > > > > > Hi, Jiangjie, > > > > > > > > Not sure returning the fetch response at the index boundary is a > > general > > > > solution. The index interval is configurable. If one configures the > > index > > > > interval larger than the per partition fetch size, we probably have > to > > > > return data not at the index boundary. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Tue, Dec 5, 2017 at 4:17 PM, Becket Qin > > wrote: > > > > > > > > > Hi Colin, > > > > > > > > > > Thinking about this again. I do see the reason that we want to > have a > > > > epoch > > > > > to avoid out of order registration of the interested set. But I am > > > > > wondering if the following semantic would meet what we want better: > > > > > - Session Id: the id assigned to a single client for life long > time. > > > i.e > > > > > it does not change when the interested partitions change. > > > > > - Epoch: the interested set epoch. Only updated when a full fetch > > > > request > > > > > comes, which may result in the interested partition set change. > > > > > This will ensure that the registered interested set will always be > > the > > > > > latest registration. And the clients can change the interested > > > partition > > > > > set without creating another session. > > > > > > > > > > Also I want to bring up the way the leader respond to the > > FetchRequest > > > > > again. I think it would be a big improvement if we just return the > > > > > responses at index entry boundary or log end. There are a few > > benefits: > > > > > 1. The leader does not need the follower to provide the offsets, > > > > > 2. The fetch requests no longer need to do a binary search on the > > > index, > > > > it > > > > > just need to do a linear access to the index file, which is much > > cache > > > > > friendly. > > > > > > > > > > Assuming the leader can get the last returned offsets to the > clients > > > > > cheaply, I am still not sure why it is necessary for the followers > to > > > > > repeat the offsets in the incremental fetch every time. Intuitively > > it > > > > > should only update the offsets when the leader has wrong offsets, > in > > > most > > > > > cases, the incremental fetch request should just be empty. > Otherwise > > we > > > > may > > > > > not be saving much when there are continuous small requests going > to > > > each > > > > > partition, which could be normal for some low latency systems. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe > > > wrote: > > > > > > > > > > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote: > > > > > > > Hi Colin > > > > > > > > > > > > > > Addressing the topic of how to manage slots from the other > > thread. > > > > > > > With tcp connections all this comes for free essentially. > > > > > > > > > > > > Hi Jan, > > > > > > > > > > > > I don't think that it's accurate to say that cache management > > "comes > > > > for > > > > > > free" by coupling the incremental fetch session with the TCP > > session. > > > > > > When a new TCP session is started by a fetch request, you still > > have > > > to > > > > > > decide whether to grant that request an incremental fetch session > > or > > > > > > not. If your answer is that you always grant the request, I > would > > > > argue > > > > > > that you do not have cache management. > > > > > > > > > > > > I guess you could argue that timeouts are cache management, but I > > > don't > > > > > > find that argument persuasive. Anyone could just create a lot of > > TCP > > > > > > sessions and use a lot of resources, in that case. So there is > > > > > > essentially no limit on memory use. In any case, TCP sessions > > don't > > > > > > help us implement fetch session timeouts. > > > > > > > > > > > > > I still would argue we disable it by default and make a flag in > > the > > > > > > > broker to ask the leader to maintain the cache while > replicating > > > and > > > > > > also only > > > > > > > have it optional in consumers (default to off) so one can turn > it > > > on > > > > > > > where it really hurts. MirrorMaker and audit consumers > > > prominently. > > > > > > > > > > > > I agree with Jason's point from earlier in the thread. Adding > > extra > > > > > > configuration knobs that aren't really necessary can harm > > usability. > > > > > > Certainly asking people to manually turn on a feature "where it > > > really > > > > > > hurts" seems to fall in that category, when we could easily > enable > > it > > > > > > automatically for them. > > > > > > > > > > > > > > > > > > > > Otherwise I left a few remarks in-line, which should help to > > > > understand > > > > > > > my view of the situation better > > > > > > > > > > > > > > Best Jan > > > > > > > > > > > > > > > > > > > > > On 05.12.2017 08:06, Colin McCabe wrote: > > > > > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote: > > > > > > > >> > > > > > > > >> On 03.12.2017 21:55, Colin McCabe wrote: > > > > > > > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote: > > > > > > > >>>> Thanks for the explanation, Colin. A few more questions. > > > > > > > >>>> > > > > > > > >>>>> The session epoch is not complex. It's just a number > which > > > > > > increments > > > > > > > >>>>> on each incremental fetch. The session epoch is also > > useful > > > > for > > > > > > > >>>>> debugging-- it allows you to match up requests and > > responses > > > > when > > > > > > > >>>>> looking at log files. > > > > > > > >>>> Currently each request in Kafka has a correlation id to > help > > > > match > > > > > > the > > > > > > > >>>> requests and responses. Is epoch doing something > > differently? > > > > > > > >>> Hi Becket, > > > > > > > >>> > > > > > > > >>> The correlation ID is used within a single TCP session, to > > > > uniquely > > > > > > > >>> associate a request with a response. The correlation ID is > > not > > > > > > unique > > > > > > > >>> (and has no meaning) outside the context of that single TCP > > > > > session. > > > > > > > >>> > > > > > > > >>> Keep in mind, NetworkClient is in charge of TCP sessions, > and > > > > > > generally > > > > > > > >>> tries to hide that information from the upper layers of the > > > code. > > > > > So > > > > > > > >>> when you submit a request to NetworkClient, you don't know > if > > > > that > > > > > > > >>> request creates a TCP session, or reuses an existing one. > > > > > > > >>>>> Unfortunately, this doesn't work. Imagine the client > > misses > > > an > > > > > > > >>>>> increment fetch response about a partition. And then the > > > > > > partition is > > > > > > > >>>>> never updated after that. The client has no way to know > > > about > > > > > the > > > > > > > >>>>> partition, since it won't be included in any future > > > incremental > > > > > > fetch > > > > > > > >>>>> responses. And there are no offsets to compare, since > the > > > > > > partition is > > > > > > > >>>>> simply omitted from the response. > > > > > > > >>>> I am curious about in which situation would the follower > > miss > > > a > > > > > > response > > > > > > > >>>> of a partition. If the entire FetchResponse is lost (e.g. > > > > > timeout), > > > > > > the > > > > > > > >>>> follower would disconnect and retry. That will result in > > > > sending a > > > > > > full > > > > > > > >>>> FetchRequest. > > > > > > > >>> Basically, you are proposing that we rely on TCP for > reliable > > > > > > delivery > > > > > > > >>> in a distributed system. That isn't a good idea for a > bunch > > of > > > > > > > >>> different reasons. First of all, TCP timeouts tend to be > > very > > > > > > long. So > > > > > > > >>> if the TCP session timing out is your error detection > > > mechanism, > > > > > you > > > > > > > >>> have to wait minutes for messages to timeout. Of course, > we > > > add > > > > a > > > > > > > >>> timeout on top of that after which we declare the > connection > > > bad > > > > > and > > > > > > > >>> manually close it. But just because the session is closed > on > > > one > > > > > end > > > > > > > >>> doesn't mean that the other end knows that it is closed. > So > > > the > > > > > > leader > > > > > > > >>> may have to wait quite a long time before TCP decides that > > yes, > > > > > > > >>> connection X from the follower is dead and not coming back, > > > even > > > > > > though > > > > > > > >>> gremlins ate the FIN packet which the follower attempted to > > > > > > translate. > > > > > > > >>> If the cache state is tied to that TCP session, we have to > > keep > > > > > that > > > > > > > >>> cache around for a much longer time than we should. > > > > > > > >> Hi, > > > > > > > >> > > > > > > > >> I see this from a different perspective. The cache expiry > time > > > > > > > >> has the same semantic as idle connection time in this > > scenario. > > > > > > > >> It is the time range we expect the client to come back an > > reuse > > > > > > > >> its broker side state. I would argue that on close we would > > get > > > an > > > > > > > >> extra shot at cleaning up the session state early. As > opposed > > to > > > > > > > >> always wait for that duration for expiry to happen. > > > > > > > > Hi Jan, > > > > > > > > > > > > > > > > The idea here is that the incremental fetch cache expiry time > > can > > > > be > > > > > > > > much shorter than the TCP session timeout. In general the > TCP > > > > > session > > > > > > > > timeout is common to all TCP connections, and very long. To > > make > > > > > these > > > > > > > > numbers a little more concrete, the TCP session timeout is > > often > > > > > > > > configured to be 2 hours on Linux. (See > > > > > > > > https://www.cyberciti.biz/tips/linux-increasing-or- > > > > > > decreasing-tcp-sockets-timeouts.html > > > > > > > > ) The timeout I was proposing for incremental fetch sessions > > was > > > > one > > > > > > or > > > > > > > > two minutes at most. > > > > > > > Currently this is taken care of by > > > > > > > connections.max.idle.ms on the broker and defaults to > something > > of > > > > few > > > > > > > minutes. > > > > > > > > > > > > It is 10 minutes by default, which is longer than what we want > the > > > > > > incremental fetch session timeout to be. There's no reason to > > couple > > > > > > these two things. > > > > > > > > > > > > > Also something we could let the client change if we really > wanted > > > to. > > > > > > > So there is no need to worry about coupling our implementation > to > > > > some > > > > > > > timeouts given by the OS, with TCP one always has full control > > over > > > > the > > > > > > worst > > > > > > > times + one gets the extra shot cleaning up early when the > close > > > > comes > > > > > > through. > > > > > > > Which is the majority of the cases. > > > > > > > > > > > > In the majority of cases, the TCP session will be re-established. > > In > > > > > > that case, we have to send a full fetch request rather than an > > > > > > incremental fetch request. > > > > > > > > > > > > > > > > > > > > > > > > > > > > >>> Secondly, from a software engineering perspective, it's > not a > > > > good > > > > > > idea > > > > > > > >>> to try to tightly tie together TCP and our code. We would > > have > > > > to > > > > > > > >>> rework how we interact with NetworkClient so that we are > > aware > > > of > > > > > > things > > > > > > > >>> like TCP sessions closing or opening. We would have to be > > > > careful > > > > > > > >>> preserve the ordering of incoming messages when doing > things > > > like > > > > > > > >>> putting incoming requests on to a queue to be processed by > > > > multiple > > > > > > > >>> threads. It's just a lot of complexity to add, and there's > > no > > > > > > upside. > > > > > > > >> I see the point here. And I had a small chat with Dong Lin > > > already > > > > > > > >> making me aware of this. I tried out the approaches and > > propose > > > > the > > > > > > > >> following: > > > > > > > >> > > > > > > > >> The client start and does a full fetch. It then does > > incremental > > > > > > fetches. > > > > > > > >> The connection to the broker dies and is re-established by > > > > > > NetworkClient > > > > > > > >> under the hood. > > > > > > > >> The broker sees an incremental fetch without having state => > > > > returns > > > > > > > >> error: > > > > > > > >> Client sees the error, does a full fetch and goes back to > > > > > > incrementally > > > > > > > >> fetching. > > > > > > > >> > > > > > > > >> having this 1 additional error round trip is essentially the > > > same > > > > as > > > > > > > >> when something > > > > > > > >> with the sessions or epoch changed unexpectedly to the > client > > > (say > > > > > > > >> expiry). > > > > > > > >> > > > > > > > >> So its nothing extra added but the conditions are easier to > > > > > evaluate. > > > > > > > >> Especially since we do everything with NetworkClient. Other > > > > > > implementers > > > > > > > >> on the > > > > > > > >> protocol are free to optimizes this and do not do the > > errornours > > > > > > > >> roundtrip on the > > > > > > > >> new connection. > > > > > > > >> Its a great plus that the client can know when the error is > > > gonna > > > > > > > >> happen. instead of > > > > > > > >> the server to always have to report back if something > changes > > > > > > > >> unexpectedly for the client > > > > > > > > You are assuming that the leader and the follower agree that > > the > > > > TCP > > > > > > > > session drops at the same time. When there are network > > problems, > > > > > this > > > > > > > > may not be true. The leader may still think the previous TCP > > > > session > > > > > > is > > > > > > > > active. In that case, we have to keep the incremental fetch > > > > session > > > > > > > > state around until we learn otherwise (which could be up to > > that > > > 2 > > > > > hour > > > > > > > > timeout I mentioned). And if we get a new incoming > incremental > > > > fetch > > > > > > > > request, we can't assume that it replaces the previous one, > > > because > > > > > the > > > > > > > > IDs will be different (the new one starts a new session). > > > > > > > As mentioned, no reason to fear some time-outs out of our > control > > > > > > > > > > > > > > > >>> Imagine that I made an argument that client IDs are > "complex" > > > and > > > > > > should > > > > > > > >>> be removed from our APIs. After all, we can just look at > the > > > > > remote > > > > > > IP > > > > > > > >>> address and TCP port of each connection. Would you think > > that > > > > was > > > > > a > > > > > > > >>> good idea? The client ID is useful when looking at logs. > > For > > > > > > example, > > > > > > > >>> if a rebalance is having problems, you want to know what > > > clients > > > > > were > > > > > > > >>> having a problem. So having the client ID field to guide > you > > > is > > > > > > > >>> actually much less "complex" in practice than not having an > > ID. > > > > > > > >> I still cant follow why the correlation idea will not help > > here. > > > > > > > >> Correlating logs with it usually works great. Even with > > > primitive > > > > > > tools > > > > > > > >> like grep > > > > > > > > The correlation ID does help somewhat, but certainly not as > > much > > > > as a > > > > > > > > unique 64-bit ID. The correlation ID is not unique in the > > > broker, > > > > > just > > > > > > > > unique to a single NetworkClient. Simiarly, the correlation > ID > > > is > > > > > not > > > > > > > > unique on the client side, if there are multiple Consumers, > > etc. > > > > > > > Can always bump entropy in correlation IDs, never had a problem > > > > > > > of finding to many duplicates. Would be a different KIP though. > > > > > > > > > > > > > > > >>> Similarly, if metadata responses had epoch numbers (simple > > > > > > incrementing > > > > > > > >>> numbers), we would not have to debug problems like clients > > > > > > accidentally > > > > > > > >>> getting old metadata from servers that had been partitioned > > off > > > > > from > > > > > > the > > > > > > > >>> network for a while. Clients would know the difference > > between > > > > old > > > > > > and > > > > > > > >>> new metadata. So putting epochs in to the metadata request > > is > > > > much > > > > > > less > > > > > > > >>> "complex" operationally, even though it's an extra field in > > the > > > > > > request. > > > > > > > >>> This has been discussed before on the mailing list. > > > > > > > >>> > > > > > > > >>> So I think the bottom line for me is that having the > session > > ID > > > > and > > > > > > > >>> session epoch, while it adds two extra fields, reduces > > > > operational > > > > > > > >>> complexity and increases debuggability. It avoids tightly > > > > coupling > > > > > > us > > > > > > > >>> to assumptions about reliable ordered delivery which tend > to > > be > > > > > > violated > > > > > > > >>> in practice in multiple layers of the stack. Finally, it > > > avoids > > > > > the > > > > > > > >>> necessity of refactoring NetworkClient. > > > > > > > >> So there is stacks out there that violate TCP guarantees? > And > > > > > software > > > > > > > >> still works? How can this be? Can you elaborate a little > where > > > > this > > > > > > > >> can be violated? I am not very familiar with virtualized > > > > > environments > > > > > > > >> but they can't really violate TCP contracts. > > > > > > > > TCP's guarantees of reliable, in-order transmission certainly > > can > > > > be > > > > > > > > violated. For example, I once had to debug a cluster where a > > > > certain > > > > > > > > node had a network card which corrupted its transmissions > > > > > occasionally. > > > > > > > > With all the layers of checksums, you would think that this > was > > > not > > > > > > > > possible, but it happened. We occasionally got corrupted > data > > > > > written > > > > > > > > to disk on the other end because of it. Even more > frustrating, > > > the > > > > > > data > > > > > > > > was not corrupted on disk on the sending node-- it was a bug > in > > > the > > > > > > > > network card driver that was injecting the errors. > > > > > > > true, but your broker might aswell read a corrupted 600GB as > size > > > > from > > > > > > > the network and die with OOM instantly. > > > > > > > > > > > > If you read 600 GB as the size from the network, you will not > "die > > > with > > > > > > OOM instantly." That would be a bug. Instead, you will notice > > that > > > > 600 > > > > > > GB is greater than max.message.bytes, and close the connection. > > > > > > > > > > > > > Optimizing for still having functional > > > > > > > software under this circumstances is not reasonable. > > > > > > > You want to get rid of such a > > > > > > > node ASAP and pray that zookeepers ticks get corrupted often > > enough > > > > > > > that it finally drops out of the cluster. > > > > > > > > > > > > > > There is a good reason that these kinda things > > > > > > > https://issues.apache.org/jira/browse/MESOS-4105 > > > > > > > don't end up as kafka Jiras. In the end you can't run any > > software > > > in > > > > > > > these containers anymore. Application layer checksums are a > neat > > > > thing > > > > > to > > > > > > > fail fast but trying to cope with this probably causes more bad > > > than > > > > > > > good. So I would argue that we shouldn't try this for the > fetch > > > > > > requests. > > > > > > > > > > > > One of the goals of Apache Kafka is to be "a streaming > platform... > > > > > > [that] lets you store streams of records in a fault-tolerant > way." > > > For > > > > > > more information, see https://kafka.apache.org/intro . > > > > Fault-tolerance > > > > > > is explicitly part of the goal of Kafka. Prayer should be > > optional, > > > > not > > > > > > required, when running the software. > > > > > > > > > > > > Crashing because someone sent you a bad packet is not reasonable > > > > > > behavior. It is a bug. Similarly, bringing down the whole > > cluster, > > > > > > which could a hundred nodes, because someone had a bad network > > > adapter > > > > > > is not reasonable behavior. It is perhaps reasonable for the > > cluster > > > > to > > > > > > perform worse when hardware is having problems. But that's a > > > different > > > > > > discussion. > > > > > > > > > > > > best, > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, my point was not about TCP's guarantees being > > violated. > > > > My > > > > > > > > point is that TCP's guarantees are only one small building > > block > > > to > > > > > > > > build a robust distributed system. TCP basically just says > > that > > > if > > > > > you > > > > > > > > get any bytes from the stream, you will get the ones that > were > > > sent > > > > > by > > > > > > > > the sender, in the order they were sent. TCP does not > > guarantee > > > > that > > > > > > > > the bytes you send will get there. It does not guarantee > that > > if > > > > you > > > > > > > > close the connection, the other end will know about it in a > > > timely > > > > > > > > fashion. > > > > > > > These are very powerful grantees and since we use TCP we should > > > > > > > piggy pack everything that is reasonable on to it. IMO there is > > no > > > > > > > need to reimplement correct sequencing again if you get that > from > > > > > > > your transport layer. It saves you the complexity, it makes > > > > > > > you application behave way more naturally and your api easier > to > > > > > > > understand. > > > > > > > > > > > > > > There is literally nothing the Kernel wont let you decide > > > > > > > especially not any timings. Only noticeable exception being > > > TIME_WAIT > > > > > > > of usually 240 seconds but that already has little todo with > the > > > > broker > > > > > > > itself and > > > > > > > if we are running out of usable ports because of this then > > expiring > > > > > > > fetch requests > > > > > > > wont help much anyways. > > > > > > > > > > > > > > I hope I could strengthen the trust you have in userland TCP > > > > connection > > > > > > > management. It is really powerful and can be exploited for > > maximum > > > > > gains > > > > > > > without much risk in my opinion. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It does not guarantee that the bytes will be received in a > > > > > > > > certain timeframe, and certainly doesn't guarantee that if > you > > > > send a > > > > > > > > byte on connection X and then on connection Y, that the > remote > > > end > > > > > will > > > > > > > > read a byte on X before reading a byte on Y. > > > > > > > Noone expects this from two independent paths of any kind. > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > Colin > > > > > > > > > > > > > > > >> Hope this made my view clearer, especially the first part. > > > > > > > >> > > > > > > > >> Best Jan > > > > > > > >> > > > > > > > >> > > > > > > > >>> best, > > > > > > > >>> Colin > > > > > > > >>> > > > > > > > >>> > > > > > > > >>>> If there is an error such as NotLeaderForPartition is > > > > > > > >>>> returned for some partitions, the follower can always > send a > > > > full > > > > > > > >>>> FetchRequest. Is there a scenario that only some of the > > > > partitions > > > > > > in a > > > > > > > >>>> FetchResponse is lost? > > > > > > > >>>> > > > > > > > >>>> Thanks, > > > > > > > >>>> > > > > > > > >>>> Jiangjie (Becket) Qin > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe< > > > cmccabe@apache.org > > > > > > > > > > > wrote: > > > > > > > >>>> > > > > > > > >>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > > > > > > > >>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe< > > > > > cmccabe@apache.org> > > > > > > > >>>>> wrote: > > > > > > > >>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > > > > > > >>>>>>>> Hey Colin, > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Thanks much for the update. I have a few questions > > below: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> 1. I am not very sure that we need Fetch Session > Epoch. > > It > > > > > > seems that > > > > > > > >>>>>>>> Fetch > > > > > > > >>>>>>>> Session Epoch is only needed to help leader > distinguish > > > > > between > > > > > > "a > > > > > > > >>>>> full > > > > > > > >>>>>>>> fetch request" and "a full fetch request and request a > > new > > > > > > > >>>>> incremental > > > > > > > >>>>>>>> fetch session". Alternatively, follower can also > > indicate > > > "a > > > > > > full > > > > > > > >>>>> fetch > > > > > > > >>>>>>>> request and request a new incremental fetch session" > by > > > > > setting > > > > > > Fetch > > > > > > > >>>>>>>> Session ID to -1 without using Fetch Session Epoch. > Does > > > > this > > > > > > make > > > > > > > >>>>> sense? > > > > > > > >>>>>>> Hi Dong, > > > > > > > >>>>>>> > > > > > > > >>>>>>> The fetch session epoch is very important for ensuring > > > > > > correctness. It > > > > > > > >>>>>>> prevents corrupted or incomplete fetch data due to > > network > > > > > > reordering > > > > > > > >>>>> or > > > > > > > >>>>>>> loss. > > > > > > > >>>>>>> > > > > > > > >>>>>>> For example, consider a scenario where the follower > > sends a > > > > > fetch > > > > > > > >>>>>>> request to the leader. The leader responds, but the > > > response > > > > > is > > > > > > lost > > > > > > > >>>>>>> because of network problems which affected the TCP > > session. > > > > In > > > > > > that > > > > > > > >>>>>>> case, the follower must establish a new TCP session and > > > > re-send > > > > > > the > > > > > > > >>>>>>> incremental fetch request. But the leader does not > know > > > that > > > > > the > > > > > > > >>>>>>> follower didn't receive the previous incremental fetch > > > > > > response. It is > > > > > > > >>>>>>> only the incremental fetch epoch which lets the leader > > know > > > > > that > > > > > > it > > > > > > > >>>>>>> needs to resend that data, and not data which comes > > > > afterwards. > > > > > > > >>>>>>> > > > > > > > >>>>>>> You could construct similar scenarios with message > > > > reordering, > > > > > > > >>>>>>> duplication, etc. Basically, this is a stateful > protocol > > > on > > > > an > > > > > > > >>>>>>> unreliable network, and you need to know whether the > > > follower > > > > > > got the > > > > > > > >>>>>>> previous data you sent before you move on. And you > need > > to > > > > > > handle > > > > > > > >>>>>>> issues like duplicated or delayed requests. These > issues > > > do > > > > > not > > > > > > affect > > > > > > > >>>>>>> the full fetch request, because it is not stateful-- > any > > > full > > > > > > fetch > > > > > > > >>>>>>> request can be understood and properly responded to in > > > > > isolation. > > > > > > > >>>>>>> > > > > > > > >>>>>> Thanks for the explanation. This makes sense. On the > other > > > > hand > > > > > I > > > > > > would > > > > > > > >>>>>> be interested in learning more about whether Becket's > > > solution > > > > > > can help > > > > > > > >>>>>> simplify the protocol by not having the echo field and > > > whether > > > > > > that is > > > > > > > >>>>>> worth doing. > > > > > > > >>>>> Hi Dong, > > > > > > > >>>>> > > > > > > > >>>>> I commented about this in the other thread. A solution > > which > > > > > > doesn't > > > > > > > >>>>> maintain session information doesn't work here. > > > > > > > >>>>> > > > > > > > >>>>>>>> 2. It is said that Incremental FetchRequest will > include > > > > > > partitions > > > > > > > >>>>> whose > > > > > > > >>>>>>>> fetch offset or maximum number of fetch bytes has been > > > > > changed. > > > > > > If > > > > > > > >>>>>>>> follower's logStartOffet of a partition has changed, > > > should > > > > > this > > > > > > > >>>>>>>> partition also be included in the next FetchRequest to > > the > > > > > > leader? > > > > > > > >>>>>>> Otherwise, it > > > > > > > >>>>>>>> may affect the handling of DeleteRecordsRequest > because > > > > leader > > > > > > may > > > > > > > >>>>> not > > > > > > > >>>>>>> know > > > > > > > >>>>>>>> the corresponding data has been deleted on the > follower. > > > > > > > >>>>>>> Yeah, the follower should include the partition if the > > > > > > logStartOffset > > > > > > > >>>>>>> has changed. That should be spelled out on the KIP. > > > Fixed. > > > > > > > >>>>>>> > > > > > > > >>>>>>>> 3. In the section "Per-Partition Data", a partition is > > not > > > > > > considered > > > > > > > >>>>>>>> dirty if its log start offset has changed. Later in > the > > > > > section > > > > > > > >>>>>>> "FetchRequest > > > > > > > >>>>>>>> Changes", it is said that incremental fetch responses > > will > > > > > > include a > > > > > > > >>>>>>>> partition if its logStartOffset has changed. It seems > > > > > > inconsistent. > > > > > > > >>>>> Can > > > > > > > >>>>>>>> you update the KIP to clarify it? > > > > > > > >>>>>>>> > > > > > > > >>>>>>> In the "Per-Partition Data" section, it does say that > > > > > > logStartOffset > > > > > > > >>>>>>> changes make a partition dirty, though, right? The > first > > > > > bullet > > > > > > point > > > > > > > >>>>>>> is: > > > > > > > >>>>>>> > > > > > > > >>>>>>>> * The LogCleaner deletes messages, and this changes > the > > > log > > > > > > start > > > > > > > >>>>> offset > > > > > > > >>>>>>> of the partition on the leader., or > > > > > > > >>>>>>> > > > > > > > >>>>>> Ah I see. I think I didn't notice this because statement > > > > assumes > > > > > > that the > > > > > > > >>>>>> LogStartOffset in the leader only changes due to > > LogCleaner. > > > > In > > > > > > fact the > > > > > > > >>>>>> LogStartOffset can change on the leader due to either > log > > > > > > retention and > > > > > > > >>>>>> DeleteRecordsRequest. I haven't verified whether > > LogCleaner > > > > can > > > > > > change > > > > > > > >>>>>> LogStartOffset though. It may be a bit better to just > say > > > > that a > > > > > > > >>>>>> partition is considered dirty if LogStartOffset changes. > > > > > > > >>>>> I agree. It should be straightforward to just resend the > > > > > > partition if > > > > > > > >>>>> logStartOffset changes. > > > > > > > >>>>> > > > > > > > >>>>>>>> 4. In "Fetch Session Caching" section, it is said that > > > each > > > > > > broker > > > > > > > >>>>> has a > > > > > > > >>>>>>>> limited number of slots. How is this number > determined? > > > Does > > > > > > this > > > > > > > >>>>> require > > > > > > > >>>>>>>> a new broker config for this number? > > > > > > > >>>>>>> Good point. I added two broker configuration > parameters > > to > > > > > > control > > > > > > > >>>>> this > > > > > > > >>>>>>> number. > > > > > > > >>>>>>> > > > > > > > >>>>>> I am curious to see whether we can avoid some of these > new > > > > > > configs. For > > > > > > > >>>>>> example, incremental.fetch.session. > cache.slots.per.broker > > > is > > > > > > probably > > > > > > > >>>>> not > > > > > > > >>>>>> necessary because if a leader knows that a FetchRequest > > > comes > > > > > > from a > > > > > > > >>>>>> follower, we probably want the leader to always cache > the > > > > > > information > > > > > > > >>>>>> from that follower. Does this make sense? > > > > > > > >>>>> Yeah, maybe we can avoid having > > > > > > > >>>>> incremental.fetch.session.cache.slots.per.broker. > > > > > > > >>>>> > > > > > > > >>>>>> Maybe we can discuss the config later after there is > > > agreement > > > > > on > > > > > > how the > > > > > > > >>>>>> protocol would look like. > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>>>> What is the error code if broker does > > > > > > > >>>>>>>> not have new log for the incoming FetchRequest? > > > > > > > >>>>>>> Hmm, is there a typo in this question? Maybe you meant > > to > > > > ask > > > > > > what > > > > > > > >>>>>>> happens if there is no new cache slot for the incoming > > > > > > FetchRequest? > > > > > > > >>>>>>> That's not an error-- the incremental fetch session ID > > just > > > > > gets > > > > > > set to > > > > > > > >>>>>>> 0, indicating no incremental fetch session was created. > > > > > > > >>>>>>> > > > > > > > >>>>>> Yeah there is a typo. You have answered my question. > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>>>> 5. Can you clarify what happens if follower adds a > > > partition > > > > > to > > > > > > the > > > > > > > >>>>>>>> ReplicaFetcherThread after receiving > > LeaderAndIsrRequest? > > > > Does > > > > > > leader > > > > > > > >>>>>>>> needs to generate a new session for this > > > > ReplicaFetcherThread > > > > > or > > > > > > > >>>>> does it > > > > > > > >>>>>>> re-use > > > > > > > >>>>>>>> the existing session? If it uses a new session, is > the > > > old > > > > > > session > > > > > > > >>>>>>>> actively deleted from the slot? > > > > > > > >>>>>>> The basic idea is that you can't make changes, except > by > > > > > sending > > > > > > a full > > > > > > > >>>>>>> fetch request. However, perhaps we can allow the > client > > to > > > > > > re-use its > > > > > > > >>>>>>> existing session ID. If the client sets sessionId = > id, > > > > epoch > > > > > = > > > > > > 0, it > > > > > > > >>>>>>> could re-initialize the session. > > > > > > > >>>>>>> > > > > > > > >>>>>> Yeah I agree with the basic idea. We probably want to > > > > understand > > > > > > more > > > > > > > >>>>>> detail about how this works later. > > > > > > > >>>>> Sounds good. I updated the KIP with this information. A > > > > > > > >>>>> re-initialization should be exactly the same as an > > > > > initialization, > > > > > > > >>>>> except that it reuses an existing ID. > > > > > > > >>>>> > > > > > > > >>>>> best, > > > > > > > >>>>> Colin > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>>>>> BTW, I think it may be useful if the KIP can include > the > > > > > example > > > > > > > >>>>> workflow > > > > > > > >>>>>>>> of how this feature will be used in case of partition > > > change > > > > > > and so > > > > > > > >>>>> on. > > > > > > > >>>>>>> Yeah, that might help. > > > > > > > >>>>>>> > > > > > > > >>>>>>> best, > > > > > > > >>>>>>> Colin > > > > > > > >>>>>>> > > > > > > > >>>>>>>> Thanks, > > > > > > > >>>>>>>> Dong > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe< > > > > > > cmccabe@apache.org> > > > > > > > >>>>>>>> wrote: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> I updated the KIP with the ideas we've been > discussing. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> best, > > > > > > > >>>>>>>>> Colin > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > > > > > > > >>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > > > > > > > >>>>>>>>>>> Hi Colin, thank you for this KIP, it can become a > > > really > > > > > > > >>>>> useful > > > > > > > >>>>>>> thing. > > > > > > > >>>>>>>>>>> I just scanned through the discussion so far and > > wanted > > > > to > > > > > > > >>>>> start a > > > > > > > >>>>>>>>>>> thread to make as decision about keeping the > > > > > > > >>>>>>>>>>> cache with the Connection / Session or having some > > sort > > > > of > > > > > > UUID > > > > > > > >>>>>>> indN > > > > > > > >>>>>>>>> exed > > > > > > > >>>>>>>>>>> global Map. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> Sorry if that has been settled already and I missed > > it. > > > > In > > > > > > this > > > > > > > >>>>>>> case > > > > > > > >>>>>>>>>>> could anyone point me to the discussion? > > > > > > > >>>>>>>>>> Hi Jan, > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> I don't think anyone has discussed the idea of tying > > the > > > > > cache > > > > > > > >>>>> to an > > > > > > > >>>>>>>>>> individual TCP session yet. I agree that since the > > > cache > > > > is > > > > > > > >>>>>>> intended to > > > > > > > >>>>>>>>>> be used only by a single follower or client, it's an > > > > > > interesting > > > > > > > >>>>>>> thing > > > > > > > >>>>>>>>>> to think about. > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> I guess the obvious disadvantage is that whenever > your > > > TCP > > > > > > > >>>>> session > > > > > > > >>>>>>>>>> drops, you have to make a full fetch request rather > > than > > > > an > > > > > > > >>>>>>> incremental > > > > > > > >>>>>>>>>> one. It's not clear to me how often this happens in > > > > > practice > > > > > > -- > > > > > > > >>>>> it > > > > > > > >>>>>>>>>> probably depends a lot on the quality of the > network. > > > > From > > > > > a > > > > > > > >>>>> code > > > > > > > >>>>>>>>>> perspective, it might also be a bit difficult to > > access > > > > data > > > > > > > >>>>>>> associated > > > > > > > >>>>>>>>>> with the Session from classes like KafkaApis > (although > > > we > > > > > > could > > > > > > > >>>>>>> refactor > > > > > > > >>>>>>>>>> it to make this easier). > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> It's also clear that even if we tie the cache to the > > > > > session, > > > > > > we > > > > > > > >>>>>>> still > > > > > > > >>>>>>>>>> have to have limits on the number of caches we're > > > willing > > > > to > > > > > > > >>>>> create. > > > > > > > >>>>>>>>>> And probably we should reserve some cache slots for > > each > > > > > > > >>>>> follower, so > > > > > > > >>>>>>>>>> that clients don't take all of them. > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> Id rather see a protocol in which the client is > > hinting > > > > the > > > > > > > >>>>> broker > > > > > > > >>>>>>>>> that, > > > > > > > >>>>>>>>>>> he is going to use the feature instead of a client > > > > > > > >>>>>>>>>>> realizing that the broker just offered the feature > > > > > > (regardless > > > > > > > >>>>> of > > > > > > > >>>>>>>>>>> protocol version which should only indicate that > the > > > > > feature > > > > > > > >>>>>>>>>>> would be usable). > > > > > > > >>>>>>>>>> Hmm. I'm not sure what you mean by "hinting." I do > > > think > > > > > > that > > > > > > > >>>>> the > > > > > > > >>>>>>>>>> server should have the option of not accepting > > > incremental > > > > > > > >>>>> requests > > > > > > > >>>>>>> from > > > > > > > >>>>>>>>>> specific clients, in order to save memory space. > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> This seems to work better with a per > > > > > > > >>>>>>>>>>> connection/session attached Metadata than with a > Map > > > and > > > > > > could > > > > > > > >>>>>>> allow > > > > > > > >>>>>>>>> for > > > > > > > >>>>>>>>>>> easier client implementations. > > > > > > > >>>>>>>>>>> It would also make Client-side code easier as there > > > > > wouldn't > > > > > > > >>>>> be any > > > > > > > >>>>>>>>>>> Cache-miss error Messages to handle. > > > > > > > >>>>>>>>>> It is nice not to have to handle cache-miss > > responses, I > > > > > > agree. > > > > > > > >>>>>>>>>> However, TCP sessions aren't exposed to most of our > > > > > > client-side > > > > > > > >>>>> code. > > > > > > > >>>>>>>>>> For example, when the Producer creates a message and > > > hands > > > > > it > > > > > > > >>>>> off to > > > > > > > >>>>>>> the > > > > > > > >>>>>>>>>> NetworkClient, the NC will transparently re-connect > > and > > > > > > re-send a > > > > > > > >>>>>>>>>> message if the first send failed. The higher-level > > code > > > > > will > > > > > > > >>>>> not be > > > > > > > >>>>>>>>>> informed about whether the TCP session was > > > re-established, > > > > > > > >>>>> whether an > > > > > > > >>>>>>>>>> existing TCP session was used, and so on. So > overall > > I > > > > > would > > > > > > > >>>>> still > > > > > > > >>>>>>> lean > > > > > > > >>>>>>>>>> towards not coupling this to the TCP session... > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> best, > > > > > > > >>>>>>>>>> Colin > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> Thank you again for the KIP. And again, if this > > was > > > > > > clarified > > > > > > > >>>>>>> already > > > > > > > >>>>>>>>>>> please drop me a hint where I could read about it. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> Best Jan > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote: > > > > > > > >>>>>>>>>>>> Hi all, > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> I created a KIP to improve the scalability and > > latency > > > > of > > > > > > > >>>>>>>>> FetchRequest: > > > > > > > >>>>>>>>>>>> https://cwiki.apache.org/ > > > confluence/display/KAFKA/KIP- > > > > > > > >>>>>>>>> 227%3A+Introduce+Incremental+ > > FetchRequests+to+Increase+ > > > > > > > >>>>>>>>> Partition+Scalability > > > > > > > >>>>>>>>>>>> Please take a look. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> cheers, > > > > > > > >>>>>>>>>>>> Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > --001a113f4a7a671dfe055fddc6fe--