Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 21649200D4F for ; Wed, 6 Dec 2017 18:33:20 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1FE3F160C0A; Wed, 6 Dec 2017 17:33:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 47107160BF3 for ; Wed, 6 Dec 2017 18:33:18 +0100 (CET) Received: (qmail 90749 invoked by uid 500); 6 Dec 2017 17:33:17 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 90736 invoked by uid 99); 6 Dec 2017 17:33:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Dec 2017 17:33:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9EA0A1A17C8 for ; Wed, 6 Dec 2017 17:33:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.479 X-Spam-Level: ** X-Spam-Status: No, score=2.479 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id E0vANfRukAGm for ; Wed, 6 Dec 2017 17:33:02 +0000 (UTC) Received: from mail-oi0-f41.google.com (mail-oi0-f41.google.com [209.85.218.41]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 4BFAD5F216 for ; Wed, 6 Dec 2017 17:33:01 +0000 (UTC) Received: by mail-oi0-f41.google.com with SMTP id w125so3065412oie.7 for ; Wed, 06 Dec 2017 09:33:01 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=S24PoaqGjfHAb04Xm54f6q2vWXy5IITUEXR+J8RwGyo=; b=E1I7QdaqnZVk/lZAQlR1jNaMRrOLeEAoK6wW1yC8aO0HQixj3f4zdQANCIML/pkJ9c jwb5nGi7Sn23y81ZwHa4WyPKggGJdKVOoWpO/bt59qaSqjVJHlflh6EXlXmkoiyXVXzx OItOcTvVE43l81ycwFw/1OPjvqKAGzNjOwnWO8rnODz+1DRcu+Py7TH3qL5hOGgOa0yH Sg6+iaYGRTjxi6+XFT4FzH4e1YkOuMvSSKqKkawRp7Jbr0eXrnG1XNyFZiIDJLX1y8U/ JYd0q9GbDOLnIR30QOp5vxbekACpL5wRdtzMsV3WXIVaJCJ4T0tlaV/zLfmTA7MhZvdf zRJQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=S24PoaqGjfHAb04Xm54f6q2vWXy5IITUEXR+J8RwGyo=; b=GC5HAOU23UaGJzzFWdtDdXuq4AtG/YlTM7XuoIY3ApN+DOxV097JqhbnG1rOak3qCn vMQk/fu1VQUdxvOenjjCqrnEJ5cQoTgJWuzQkWaTG0Yc7NfVrZnuFtnZs04Be4kCG52T 5691aF+9PXUc1IAlne1FsOvoQ7U/NJVODvhYb2UfBNv0eaMSHmhptyTsNVX8lTIktWWw 32ixDhYuThn9mR7GQqUHXq6h9TLip427ojWObnHrkTkSRdZ3uVoEkc5IPgYYGoNecGbS NBVFesP02zki4zuXy+nSYKD6n/xxl5jC6QtLYPMK6Y/CCg97ejdRSLQPm0Jp2KMDAoMD eNjw== X-Gm-Message-State: AKGB3mK4Ciwm3fBJvVKoQnaiUVXfH6zCRSMc8r2ykLmmXq6mxotUtFLD KIkPGPFEJVQ6ZE1xpAdv7Orfw/QmupZYnBotG9PT4Sjh X-Google-Smtp-Source: AGs4zMZFX5v2THulpBhlz2uMF1I90evO/oS+TeaWZ8T+bN5fG7PFyvxJBFpSh2VzIBi+yk3MajmKb3tE4G05u596n/Y= X-Received: by 10.202.76.143 with SMTP id z137mr10933282oia.77.1512581579334; Wed, 06 Dec 2017 09:32:59 -0800 (PST) MIME-Version: 1.0 Received: by 10.74.41.140 with HTTP; Wed, 6 Dec 2017 09:32:58 -0800 (PST) In-Reply-To: References: <1511298156.455181.1180266296.193D3A00@webmail.messagingengine.com> <5A1D027C.7020206@trivago.com> <1511887086.3498918.1186985376.7F1DEDD6@webmail.messagingengine.com> <1511986396.621620.1188577080.5C0A7DEC@webmail.messagingengine.com> <1512063435.1832330.1189640136.2D1EEEC1@webmail.messagingengine.com> <1512254273.250506.1191933912.13817ECC@webmail.messagingengine.com> <1512334541.920899.1192564144.1A4C5D62@webmail.messagingengine.com> <5A25231B.8050703@trivago.com> <1512457563.3859529.1194299248.56AB43CE@webmail.messagingengine.com> <5A270BF3.8070701@trivago.com> <1512512081.3862143.1195227528.1E8C4CEB@webmail.messagingengine.com> From: Jason Gustafson Date: Wed, 6 Dec 2017 09:32:58 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="001a113e4f54bb091c055faf57d9" archived-at: Wed, 06 Dec 2017 17:33:20 -0000 --001a113e4f54bb091c055faf57d9 Content-Type: text/plain; charset="UTF-8" > > Thinking about this again. I do see the reason that we want to have a epoch > to avoid out of order registration of the interested set. But I am > wondering if the following semantic would meet what we want better: > - Session Id: the id assigned to a single client for life long time. i.e > it does not change when the interested partitions change. > - Epoch: the interested set epoch. Only updated when a full fetch request > comes, which may result in the interested partition set change. > This will ensure that the registered interested set will always be the > latest registration. And the clients can change the interested partition > set without creating another session. I agree this is a bit more intuitive than the sequence number and the ability to reuse the session is beneficial since it causes less waste of the cache for session timeouts. I would say that the epoch should be controlled by the client and a bump of the epoch indicates a full fetch request. The client should also bump the epoch if it fails to receive a fetch response. This ensures that the broker cannot receive an old request after the client has reconnected and sent a new one which could cause an invalid session state. -Jason On Tue, Dec 5, 2017 at 9:38 PM, Becket Qin wrote: > Hi Jun, > > That is true, but in reality it seems rare that the fetch size is smaller > than index interval. In the worst case, we may need to do another look up. > In the future, when we have the mechanism to inform the clients about the > broker configurations, the clients may want to configure correspondingly as > well, e.g. max message size, max timestamp difference, etc. > > On the other hand, we are not guaranteeing that the returned bytes in a > partition is always bounded by the per partition fetch size, because we are > going to return at least one message, so the per partition fetch size seems > already a soft limit. Since we are introducing a new fetch protocol and > this is related, it might be worth considering this option. > > BTW, one reason I bring this up again was because yesterday we had a > presentation from Uber regarding the end to end latency. And they are > seeing this binary search behavior impacting the latency due to page in/out > of the index file. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Tue, Dec 5, 2017 at 5:55 PM, Jun Rao wrote: > > > Hi, Jiangjie, > > > > Not sure returning the fetch response at the index boundary is a general > > solution. The index interval is configurable. If one configures the index > > interval larger than the per partition fetch size, we probably have to > > return data not at the index boundary. > > > > Thanks, > > > > Jun > > > > On Tue, Dec 5, 2017 at 4:17 PM, Becket Qin wrote: > > > > > Hi Colin, > > > > > > Thinking about this again. I do see the reason that we want to have a > > epoch > > > to avoid out of order registration of the interested set. But I am > > > wondering if the following semantic would meet what we want better: > > > - Session Id: the id assigned to a single client for life long time. > i.e > > > it does not change when the interested partitions change. > > > - Epoch: the interested set epoch. Only updated when a full fetch > > request > > > comes, which may result in the interested partition set change. > > > This will ensure that the registered interested set will always be the > > > latest registration. And the clients can change the interested > partition > > > set without creating another session. > > > > > > Also I want to bring up the way the leader respond to the FetchRequest > > > again. I think it would be a big improvement if we just return the > > > responses at index entry boundary or log end. There are a few benefits: > > > 1. The leader does not need the follower to provide the offsets, > > > 2. The fetch requests no longer need to do a binary search on the > index, > > it > > > just need to do a linear access to the index file, which is much cache > > > friendly. > > > > > > Assuming the leader can get the last returned offsets to the clients > > > cheaply, I am still not sure why it is necessary for the followers to > > > repeat the offsets in the incremental fetch every time. Intuitively it > > > should only update the offsets when the leader has wrong offsets, in > most > > > cases, the incremental fetch request should just be empty. Otherwise we > > may > > > not be saving much when there are continuous small requests going to > each > > > partition, which could be normal for some low latency systems. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe > wrote: > > > > > > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote: > > > > > Hi Colin > > > > > > > > > > Addressing the topic of how to manage slots from the other thread. > > > > > With tcp connections all this comes for free essentially. > > > > > > > > Hi Jan, > > > > > > > > I don't think that it's accurate to say that cache management "comes > > for > > > > free" by coupling the incremental fetch session with the TCP session. > > > > When a new TCP session is started by a fetch request, you still have > to > > > > decide whether to grant that request an incremental fetch session or > > > > not. If your answer is that you always grant the request, I would > > argue > > > > that you do not have cache management. > > > > > > > > I guess you could argue that timeouts are cache management, but I > don't > > > > find that argument persuasive. Anyone could just create a lot of TCP > > > > sessions and use a lot of resources, in that case. So there is > > > > essentially no limit on memory use. In any case, TCP sessions don't > > > > help us implement fetch session timeouts. > > > > > > > > > I still would argue we disable it by default and make a flag in the > > > > > broker to ask the leader to maintain the cache while replicating > and > > > > also only > > > > > have it optional in consumers (default to off) so one can turn it > on > > > > > where it really hurts. MirrorMaker and audit consumers > prominently. > > > > > > > > I agree with Jason's point from earlier in the thread. Adding extra > > > > configuration knobs that aren't really necessary can harm usability. > > > > Certainly asking people to manually turn on a feature "where it > really > > > > hurts" seems to fall in that category, when we could easily enable it > > > > automatically for them. > > > > > > > > > > > > > > Otherwise I left a few remarks in-line, which should help to > > understand > > > > > my view of the situation better > > > > > > > > > > Best Jan > > > > > > > > > > > > > > > On 05.12.2017 08:06, Colin McCabe wrote: > > > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote: > > > > > >> > > > > > >> On 03.12.2017 21:55, Colin McCabe wrote: > > > > > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote: > > > > > >>>> Thanks for the explanation, Colin. A few more questions. > > > > > >>>> > > > > > >>>>> The session epoch is not complex. It's just a number which > > > > increments > > > > > >>>>> on each incremental fetch. The session epoch is also useful > > for > > > > > >>>>> debugging-- it allows you to match up requests and responses > > when > > > > > >>>>> looking at log files. > > > > > >>>> Currently each request in Kafka has a correlation id to help > > match > > > > the > > > > > >>>> requests and responses. Is epoch doing something differently? > > > > > >>> Hi Becket, > > > > > >>> > > > > > >>> The correlation ID is used within a single TCP session, to > > uniquely > > > > > >>> associate a request with a response. The correlation ID is not > > > > unique > > > > > >>> (and has no meaning) outside the context of that single TCP > > > session. > > > > > >>> > > > > > >>> Keep in mind, NetworkClient is in charge of TCP sessions, and > > > > generally > > > > > >>> tries to hide that information from the upper layers of the > code. > > > So > > > > > >>> when you submit a request to NetworkClient, you don't know if > > that > > > > > >>> request creates a TCP session, or reuses an existing one. > > > > > >>>>> Unfortunately, this doesn't work. Imagine the client misses > an > > > > > >>>>> increment fetch response about a partition. And then the > > > > partition is > > > > > >>>>> never updated after that. The client has no way to know > about > > > the > > > > > >>>>> partition, since it won't be included in any future > incremental > > > > fetch > > > > > >>>>> responses. And there are no offsets to compare, since the > > > > partition is > > > > > >>>>> simply omitted from the response. > > > > > >>>> I am curious about in which situation would the follower miss > a > > > > response > > > > > >>>> of a partition. If the entire FetchResponse is lost (e.g. > > > timeout), > > > > the > > > > > >>>> follower would disconnect and retry. That will result in > > sending a > > > > full > > > > > >>>> FetchRequest. > > > > > >>> Basically, you are proposing that we rely on TCP for reliable > > > > delivery > > > > > >>> in a distributed system. That isn't a good idea for a bunch of > > > > > >>> different reasons. First of all, TCP timeouts tend to be very > > > > long. So > > > > > >>> if the TCP session timing out is your error detection > mechanism, > > > you > > > > > >>> have to wait minutes for messages to timeout. Of course, we > add > > a > > > > > >>> timeout on top of that after which we declare the connection > bad > > > and > > > > > >>> manually close it. But just because the session is closed on > one > > > end > > > > > >>> doesn't mean that the other end knows that it is closed. So > the > > > > leader > > > > > >>> may have to wait quite a long time before TCP decides that yes, > > > > > >>> connection X from the follower is dead and not coming back, > even > > > > though > > > > > >>> gremlins ate the FIN packet which the follower attempted to > > > > translate. > > > > > >>> If the cache state is tied to that TCP session, we have to keep > > > that > > > > > >>> cache around for a much longer time than we should. > > > > > >> Hi, > > > > > >> > > > > > >> I see this from a different perspective. The cache expiry time > > > > > >> has the same semantic as idle connection time in this scenario. > > > > > >> It is the time range we expect the client to come back an reuse > > > > > >> its broker side state. I would argue that on close we would get > an > > > > > >> extra shot at cleaning up the session state early. As opposed to > > > > > >> always wait for that duration for expiry to happen. > > > > > > Hi Jan, > > > > > > > > > > > > The idea here is that the incremental fetch cache expiry time can > > be > > > > > > much shorter than the TCP session timeout. In general the TCP > > > session > > > > > > timeout is common to all TCP connections, and very long. To make > > > these > > > > > > numbers a little more concrete, the TCP session timeout is often > > > > > > configured to be 2 hours on Linux. (See > > > > > > https://www.cyberciti.biz/tips/linux-increasing-or- > > > > decreasing-tcp-sockets-timeouts.html > > > > > > ) The timeout I was proposing for incremental fetch sessions was > > one > > > > or > > > > > > two minutes at most. > > > > > Currently this is taken care of by > > > > > connections.max.idle.ms on the broker and defaults to something of > > few > > > > > minutes. > > > > > > > > It is 10 minutes by default, which is longer than what we want the > > > > incremental fetch session timeout to be. There's no reason to couple > > > > these two things. > > > > > > > > > Also something we could let the client change if we really wanted > to. > > > > > So there is no need to worry about coupling our implementation to > > some > > > > > timeouts given by the OS, with TCP one always has full control over > > the > > > > worst > > > > > times + one gets the extra shot cleaning up early when the close > > comes > > > > through. > > > > > Which is the majority of the cases. > > > > > > > > In the majority of cases, the TCP session will be re-established. In > > > > that case, we have to send a full fetch request rather than an > > > > incremental fetch request. > > > > > > > > > > > > > > > > > > > > >>> Secondly, from a software engineering perspective, it's not a > > good > > > > idea > > > > > >>> to try to tightly tie together TCP and our code. We would have > > to > > > > > >>> rework how we interact with NetworkClient so that we are aware > of > > > > things > > > > > >>> like TCP sessions closing or opening. We would have to be > > careful > > > > > >>> preserve the ordering of incoming messages when doing things > like > > > > > >>> putting incoming requests on to a queue to be processed by > > multiple > > > > > >>> threads. It's just a lot of complexity to add, and there's no > > > > upside. > > > > > >> I see the point here. And I had a small chat with Dong Lin > already > > > > > >> making me aware of this. I tried out the approaches and propose > > the > > > > > >> following: > > > > > >> > > > > > >> The client start and does a full fetch. It then does incremental > > > > fetches. > > > > > >> The connection to the broker dies and is re-established by > > > > NetworkClient > > > > > >> under the hood. > > > > > >> The broker sees an incremental fetch without having state => > > returns > > > > > >> error: > > > > > >> Client sees the error, does a full fetch and goes back to > > > > incrementally > > > > > >> fetching. > > > > > >> > > > > > >> having this 1 additional error round trip is essentially the > same > > as > > > > > >> when something > > > > > >> with the sessions or epoch changed unexpectedly to the client > (say > > > > > >> expiry). > > > > > >> > > > > > >> So its nothing extra added but the conditions are easier to > > > evaluate. > > > > > >> Especially since we do everything with NetworkClient. Other > > > > implementers > > > > > >> on the > > > > > >> protocol are free to optimizes this and do not do the errornours > > > > > >> roundtrip on the > > > > > >> new connection. > > > > > >> Its a great plus that the client can know when the error is > gonna > > > > > >> happen. instead of > > > > > >> the server to always have to report back if something changes > > > > > >> unexpectedly for the client > > > > > > You are assuming that the leader and the follower agree that the > > TCP > > > > > > session drops at the same time. When there are network problems, > > > this > > > > > > may not be true. The leader may still think the previous TCP > > session > > > > is > > > > > > active. In that case, we have to keep the incremental fetch > > session > > > > > > state around until we learn otherwise (which could be up to that > 2 > > > hour > > > > > > timeout I mentioned). And if we get a new incoming incremental > > fetch > > > > > > request, we can't assume that it replaces the previous one, > because > > > the > > > > > > IDs will be different (the new one starts a new session). > > > > > As mentioned, no reason to fear some time-outs out of our control > > > > > > > > > > > >>> Imagine that I made an argument that client IDs are "complex" > and > > > > should > > > > > >>> be removed from our APIs. After all, we can just look at the > > > remote > > > > IP > > > > > >>> address and TCP port of each connection. Would you think that > > was > > > a > > > > > >>> good idea? The client ID is useful when looking at logs. For > > > > example, > > > > > >>> if a rebalance is having problems, you want to know what > clients > > > were > > > > > >>> having a problem. So having the client ID field to guide you > is > > > > > >>> actually much less "complex" in practice than not having an ID. > > > > > >> I still cant follow why the correlation idea will not help here. > > > > > >> Correlating logs with it usually works great. Even with > primitive > > > > tools > > > > > >> like grep > > > > > > The correlation ID does help somewhat, but certainly not as much > > as a > > > > > > unique 64-bit ID. The correlation ID is not unique in the > broker, > > > just > > > > > > unique to a single NetworkClient. Simiarly, the correlation ID > is > > > not > > > > > > unique on the client side, if there are multiple Consumers, etc. > > > > > Can always bump entropy in correlation IDs, never had a problem > > > > > of finding to many duplicates. Would be a different KIP though. > > > > > > > > > > > >>> Similarly, if metadata responses had epoch numbers (simple > > > > incrementing > > > > > >>> numbers), we would not have to debug problems like clients > > > > accidentally > > > > > >>> getting old metadata from servers that had been partitioned off > > > from > > > > the > > > > > >>> network for a while. Clients would know the difference between > > old > > > > and > > > > > >>> new metadata. So putting epochs in to the metadata request is > > much > > > > less > > > > > >>> "complex" operationally, even though it's an extra field in the > > > > request. > > > > > >>> This has been discussed before on the mailing list. > > > > > >>> > > > > > >>> So I think the bottom line for me is that having the session ID > > and > > > > > >>> session epoch, while it adds two extra fields, reduces > > operational > > > > > >>> complexity and increases debuggability. It avoids tightly > > coupling > > > > us > > > > > >>> to assumptions about reliable ordered delivery which tend to be > > > > violated > > > > > >>> in practice in multiple layers of the stack. Finally, it > avoids > > > the > > > > > >>> necessity of refactoring NetworkClient. > > > > > >> So there is stacks out there that violate TCP guarantees? And > > > software > > > > > >> still works? How can this be? Can you elaborate a little where > > this > > > > > >> can be violated? I am not very familiar with virtualized > > > environments > > > > > >> but they can't really violate TCP contracts. > > > > > > TCP's guarantees of reliable, in-order transmission certainly can > > be > > > > > > violated. For example, I once had to debug a cluster where a > > certain > > > > > > node had a network card which corrupted its transmissions > > > occasionally. > > > > > > With all the layers of checksums, you would think that this was > not > > > > > > possible, but it happened. We occasionally got corrupted data > > > written > > > > > > to disk on the other end because of it. Even more frustrating, > the > > > > data > > > > > > was not corrupted on disk on the sending node-- it was a bug in > the > > > > > > network card driver that was injecting the errors. > > > > > true, but your broker might aswell read a corrupted 600GB as size > > from > > > > > the network and die with OOM instantly. > > > > > > > > If you read 600 GB as the size from the network, you will not "die > with > > > > OOM instantly." That would be a bug. Instead, you will notice that > > 600 > > > > GB is greater than max.message.bytes, and close the connection. > > > > > > > > > Optimizing for still having functional > > > > > software under this circumstances is not reasonable. > > > > > You want to get rid of such a > > > > > node ASAP and pray that zookeepers ticks get corrupted often enough > > > > > that it finally drops out of the cluster. > > > > > > > > > > There is a good reason that these kinda things > > > > > https://issues.apache.org/jira/browse/MESOS-4105 > > > > > don't end up as kafka Jiras. In the end you can't run any software > in > > > > > these containers anymore. Application layer checksums are a neat > > thing > > > to > > > > > fail fast but trying to cope with this probably causes more bad > than > > > > > good. So I would argue that we shouldn't try this for the fetch > > > > requests. > > > > > > > > One of the goals of Apache Kafka is to be "a streaming platform... > > > > [that] lets you store streams of records in a fault-tolerant way." > For > > > > more information, see https://kafka.apache.org/intro . > > Fault-tolerance > > > > is explicitly part of the goal of Kafka. Prayer should be optional, > > not > > > > required, when running the software. > > > > > > > > Crashing because someone sent you a bad packet is not reasonable > > > > behavior. It is a bug. Similarly, bringing down the whole cluster, > > > > which could a hundred nodes, because someone had a bad network > adapter > > > > is not reasonable behavior. It is perhaps reasonable for the cluster > > to > > > > perform worse when hardware is having problems. But that's a > different > > > > discussion. > > > > > > > > best, > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > However, my point was not about TCP's guarantees being violated. > > My > > > > > > point is that TCP's guarantees are only one small building block > to > > > > > > build a robust distributed system. TCP basically just says that > if > > > you > > > > > > get any bytes from the stream, you will get the ones that were > sent > > > by > > > > > > the sender, in the order they were sent. TCP does not guarantee > > that > > > > > > the bytes you send will get there. It does not guarantee that if > > you > > > > > > close the connection, the other end will know about it in a > timely > > > > > > fashion. > > > > > These are very powerful grantees and since we use TCP we should > > > > > piggy pack everything that is reasonable on to it. IMO there is no > > > > > need to reimplement correct sequencing again if you get that from > > > > > your transport layer. It saves you the complexity, it makes > > > > > you application behave way more naturally and your api easier to > > > > > understand. > > > > > > > > > > There is literally nothing the Kernel wont let you decide > > > > > especially not any timings. Only noticeable exception being > TIME_WAIT > > > > > of usually 240 seconds but that already has little todo with the > > broker > > > > > itself and > > > > > if we are running out of usable ports because of this then expiring > > > > > fetch requests > > > > > wont help much anyways. > > > > > > > > > > I hope I could strengthen the trust you have in userland TCP > > connection > > > > > management. It is really powerful and can be exploited for maximum > > > gains > > > > > without much risk in my opinion. > > > > > > > > > > > > > > > > > > > > > It does not guarantee that the bytes will be received in a > > > > > > certain timeframe, and certainly doesn't guarantee that if you > > send a > > > > > > byte on connection X and then on connection Y, that the remote > end > > > will > > > > > > read a byte on X before reading a byte on Y. > > > > > Noone expects this from two independent paths of any kind. > > > > > > > > > > > > > > > > > best, > > > > > > Colin > > > > > > > > > > > >> Hope this made my view clearer, especially the first part. > > > > > >> > > > > > >> Best Jan > > > > > >> > > > > > >> > > > > > >>> best, > > > > > >>> Colin > > > > > >>> > > > > > >>> > > > > > >>>> If there is an error such as NotLeaderForPartition is > > > > > >>>> returned for some partitions, the follower can always send a > > full > > > > > >>>> FetchRequest. Is there a scenario that only some of the > > partitions > > > > in a > > > > > >>>> FetchResponse is lost? > > > > > >>>> > > > > > >>>> Thanks, > > > > > >>>> > > > > > >>>> Jiangjie (Becket) Qin > > > > > >>>> > > > > > >>>> > > > > > >>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe< > cmccabe@apache.org > > > > > > > wrote: > > > > > >>>> > > > > > >>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > > > > > >>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe< > > > cmccabe@apache.org> > > > > > >>>>> wrote: > > > > > >>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > > > > >>>>>>>> Hey Colin, > > > > > >>>>>>>> > > > > > >>>>>>>> Thanks much for the update. I have a few questions below: > > > > > >>>>>>>> > > > > > >>>>>>>> 1. I am not very sure that we need Fetch Session Epoch. It > > > > seems that > > > > > >>>>>>>> Fetch > > > > > >>>>>>>> Session Epoch is only needed to help leader distinguish > > > between > > > > "a > > > > > >>>>> full > > > > > >>>>>>>> fetch request" and "a full fetch request and request a new > > > > > >>>>> incremental > > > > > >>>>>>>> fetch session". Alternatively, follower can also indicate > "a > > > > full > > > > > >>>>> fetch > > > > > >>>>>>>> request and request a new incremental fetch session" by > > > setting > > > > Fetch > > > > > >>>>>>>> Session ID to -1 without using Fetch Session Epoch. Does > > this > > > > make > > > > > >>>>> sense? > > > > > >>>>>>> Hi Dong, > > > > > >>>>>>> > > > > > >>>>>>> The fetch session epoch is very important for ensuring > > > > correctness. It > > > > > >>>>>>> prevents corrupted or incomplete fetch data due to network > > > > reordering > > > > > >>>>> or > > > > > >>>>>>> loss. > > > > > >>>>>>> > > > > > >>>>>>> For example, consider a scenario where the follower sends a > > > fetch > > > > > >>>>>>> request to the leader. The leader responds, but the > response > > > is > > > > lost > > > > > >>>>>>> because of network problems which affected the TCP session. > > In > > > > that > > > > > >>>>>>> case, the follower must establish a new TCP session and > > re-send > > > > the > > > > > >>>>>>> incremental fetch request. But the leader does not know > that > > > the > > > > > >>>>>>> follower didn't receive the previous incremental fetch > > > > response. It is > > > > > >>>>>>> only the incremental fetch epoch which lets the leader know > > > that > > > > it > > > > > >>>>>>> needs to resend that data, and not data which comes > > afterwards. > > > > > >>>>>>> > > > > > >>>>>>> You could construct similar scenarios with message > > reordering, > > > > > >>>>>>> duplication, etc. Basically, this is a stateful protocol > on > > an > > > > > >>>>>>> unreliable network, and you need to know whether the > follower > > > > got the > > > > > >>>>>>> previous data you sent before you move on. And you need to > > > > handle > > > > > >>>>>>> issues like duplicated or delayed requests. These issues > do > > > not > > > > affect > > > > > >>>>>>> the full fetch request, because it is not stateful-- any > full > > > > fetch > > > > > >>>>>>> request can be understood and properly responded to in > > > isolation. > > > > > >>>>>>> > > > > > >>>>>> Thanks for the explanation. This makes sense. On the other > > hand > > > I > > > > would > > > > > >>>>>> be interested in learning more about whether Becket's > solution > > > > can help > > > > > >>>>>> simplify the protocol by not having the echo field and > whether > > > > that is > > > > > >>>>>> worth doing. > > > > > >>>>> Hi Dong, > > > > > >>>>> > > > > > >>>>> I commented about this in the other thread. A solution which > > > > doesn't > > > > > >>>>> maintain session information doesn't work here. > > > > > >>>>> > > > > > >>>>>>>> 2. It is said that Incremental FetchRequest will include > > > > partitions > > > > > >>>>> whose > > > > > >>>>>>>> fetch offset or maximum number of fetch bytes has been > > > changed. > > > > If > > > > > >>>>>>>> follower's logStartOffet of a partition has changed, > should > > > this > > > > > >>>>>>>> partition also be included in the next FetchRequest to the > > > > leader? > > > > > >>>>>>> Otherwise, it > > > > > >>>>>>>> may affect the handling of DeleteRecordsRequest because > > leader > > > > may > > > > > >>>>> not > > > > > >>>>>>> know > > > > > >>>>>>>> the corresponding data has been deleted on the follower. > > > > > >>>>>>> Yeah, the follower should include the partition if the > > > > logStartOffset > > > > > >>>>>>> has changed. That should be spelled out on the KIP. > Fixed. > > > > > >>>>>>> > > > > > >>>>>>>> 3. In the section "Per-Partition Data", a partition is not > > > > considered > > > > > >>>>>>>> dirty if its log start offset has changed. Later in the > > > section > > > > > >>>>>>> "FetchRequest > > > > > >>>>>>>> Changes", it is said that incremental fetch responses will > > > > include a > > > > > >>>>>>>> partition if its logStartOffset has changed. It seems > > > > inconsistent. > > > > > >>>>> Can > > > > > >>>>>>>> you update the KIP to clarify it? > > > > > >>>>>>>> > > > > > >>>>>>> In the "Per-Partition Data" section, it does say that > > > > logStartOffset > > > > > >>>>>>> changes make a partition dirty, though, right? The first > > > bullet > > > > point > > > > > >>>>>>> is: > > > > > >>>>>>> > > > > > >>>>>>>> * The LogCleaner deletes messages, and this changes the > log > > > > start > > > > > >>>>> offset > > > > > >>>>>>> of the partition on the leader., or > > > > > >>>>>>> > > > > > >>>>>> Ah I see. I think I didn't notice this because statement > > assumes > > > > that the > > > > > >>>>>> LogStartOffset in the leader only changes due to LogCleaner. > > In > > > > fact the > > > > > >>>>>> LogStartOffset can change on the leader due to either log > > > > retention and > > > > > >>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner > > can > > > > change > > > > > >>>>>> LogStartOffset though. It may be a bit better to just say > > that a > > > > > >>>>>> partition is considered dirty if LogStartOffset changes. > > > > > >>>>> I agree. It should be straightforward to just resend the > > > > partition if > > > > > >>>>> logStartOffset changes. > > > > > >>>>> > > > > > >>>>>>>> 4. In "Fetch Session Caching" section, it is said that > each > > > > broker > > > > > >>>>> has a > > > > > >>>>>>>> limited number of slots. How is this number determined? > Does > > > > this > > > > > >>>>> require > > > > > >>>>>>>> a new broker config for this number? > > > > > >>>>>>> Good point. I added two broker configuration parameters to > > > > control > > > > > >>>>> this > > > > > >>>>>>> number. > > > > > >>>>>>> > > > > > >>>>>> I am curious to see whether we can avoid some of these new > > > > configs. For > > > > > >>>>>> example, incremental.fetch.session.cache.slots.per.broker > is > > > > probably > > > > > >>>>> not > > > > > >>>>>> necessary because if a leader knows that a FetchRequest > comes > > > > from a > > > > > >>>>>> follower, we probably want the leader to always cache the > > > > information > > > > > >>>>>> from that follower. Does this make sense? > > > > > >>>>> Yeah, maybe we can avoid having > > > > > >>>>> incremental.fetch.session.cache.slots.per.broker. > > > > > >>>>> > > > > > >>>>>> Maybe we can discuss the config later after there is > agreement > > > on > > > > how the > > > > > >>>>>> protocol would look like. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>>>> What is the error code if broker does > > > > > >>>>>>>> not have new log for the incoming FetchRequest? > > > > > >>>>>>> Hmm, is there a typo in this question? Maybe you meant to > > ask > > > > what > > > > > >>>>>>> happens if there is no new cache slot for the incoming > > > > FetchRequest? > > > > > >>>>>>> That's not an error-- the incremental fetch session ID just > > > gets > > > > set to > > > > > >>>>>>> 0, indicating no incremental fetch session was created. > > > > > >>>>>>> > > > > > >>>>>> Yeah there is a typo. You have answered my question. > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>>>> 5. Can you clarify what happens if follower adds a > partition > > > to > > > > the > > > > > >>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? > > Does > > > > leader > > > > > >>>>>>>> needs to generate a new session for this > > ReplicaFetcherThread > > > or > > > > > >>>>> does it > > > > > >>>>>>> re-use > > > > > >>>>>>>> the existing session? If it uses a new session, is the > old > > > > session > > > > > >>>>>>>> actively deleted from the slot? > > > > > >>>>>>> The basic idea is that you can't make changes, except by > > > sending > > > > a full > > > > > >>>>>>> fetch request. However, perhaps we can allow the client to > > > > re-use its > > > > > >>>>>>> existing session ID. If the client sets sessionId = id, > > epoch > > > = > > > > 0, it > > > > > >>>>>>> could re-initialize the session. > > > > > >>>>>>> > > > > > >>>>>> Yeah I agree with the basic idea. We probably want to > > understand > > > > more > > > > > >>>>>> detail about how this works later. > > > > > >>>>> Sounds good. I updated the KIP with this information. A > > > > > >>>>> re-initialization should be exactly the same as an > > > initialization, > > > > > >>>>> except that it reuses an existing ID. > > > > > >>>>> > > > > > >>>>> best, > > > > > >>>>> Colin > > > > > >>>>> > > > > > >>>>> > > > > > >>>>>>>> BTW, I think it may be useful if the KIP can include the > > > example > > > > > >>>>> workflow > > > > > >>>>>>>> of how this feature will be used in case of partition > change > > > > and so > > > > > >>>>> on. > > > > > >>>>>>> Yeah, that might help. > > > > > >>>>>>> > > > > > >>>>>>> best, > > > > > >>>>>>> Colin > > > > > >>>>>>> > > > > > >>>>>>>> Thanks, > > > > > >>>>>>>> Dong > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe< > > > > cmccabe@apache.org> > > > > > >>>>>>>> wrote: > > > > > >>>>>>>> > > > > > >>>>>>>>> I updated the KIP with the ideas we've been discussing. > > > > > >>>>>>>>> > > > > > >>>>>>>>> best, > > > > > >>>>>>>>> Colin > > > > > >>>>>>>>> > > > > > >>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > > > > > >>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > > > > > >>>>>>>>>>> Hi Colin, thank you for this KIP, it can become a > really > > > > > >>>>> useful > > > > > >>>>>>> thing. > > > > > >>>>>>>>>>> I just scanned through the discussion so far and wanted > > to > > > > > >>>>> start a > > > > > >>>>>>>>>>> thread to make as decision about keeping the > > > > > >>>>>>>>>>> cache with the Connection / Session or having some sort > > of > > > > UUID > > > > > >>>>>>> indN > > > > > >>>>>>>>> exed > > > > > >>>>>>>>>>> global Map. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Sorry if that has been settled already and I missed it. > > In > > > > this > > > > > >>>>>>> case > > > > > >>>>>>>>>>> could anyone point me to the discussion? > > > > > >>>>>>>>>> Hi Jan, > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> I don't think anyone has discussed the idea of tying the > > > cache > > > > > >>>>> to an > > > > > >>>>>>>>>> individual TCP session yet. I agree that since the > cache > > is > > > > > >>>>>>> intended to > > > > > >>>>>>>>>> be used only by a single follower or client, it's an > > > > interesting > > > > > >>>>>>> thing > > > > > >>>>>>>>>> to think about. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> I guess the obvious disadvantage is that whenever your > TCP > > > > > >>>>> session > > > > > >>>>>>>>>> drops, you have to make a full fetch request rather than > > an > > > > > >>>>>>> incremental > > > > > >>>>>>>>>> one. It's not clear to me how often this happens in > > > practice > > > > -- > > > > > >>>>> it > > > > > >>>>>>>>>> probably depends a lot on the quality of the network. > > From > > > a > > > > > >>>>> code > > > > > >>>>>>>>>> perspective, it might also be a bit difficult to access > > data > > > > > >>>>>>> associated > > > > > >>>>>>>>>> with the Session from classes like KafkaApis (although > we > > > > could > > > > > >>>>>>> refactor > > > > > >>>>>>>>>> it to make this easier). > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> It's also clear that even if we tie the cache to the > > > session, > > > > we > > > > > >>>>>>> still > > > > > >>>>>>>>>> have to have limits on the number of caches we're > willing > > to > > > > > >>>>> create. > > > > > >>>>>>>>>> And probably we should reserve some cache slots for each > > > > > >>>>> follower, so > > > > > >>>>>>>>>> that clients don't take all of them. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>>> Id rather see a protocol in which the client is hinting > > the > > > > > >>>>> broker > > > > > >>>>>>>>> that, > > > > > >>>>>>>>>>> he is going to use the feature instead of a client > > > > > >>>>>>>>>>> realizing that the broker just offered the feature > > > > (regardless > > > > > >>>>> of > > > > > >>>>>>>>>>> protocol version which should only indicate that the > > > feature > > > > > >>>>>>>>>>> would be usable). > > > > > >>>>>>>>>> Hmm. I'm not sure what you mean by "hinting." I do > think > > > > that > > > > > >>>>> the > > > > > >>>>>>>>>> server should have the option of not accepting > incremental > > > > > >>>>> requests > > > > > >>>>>>> from > > > > > >>>>>>>>>> specific clients, in order to save memory space. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>>> This seems to work better with a per > > > > > >>>>>>>>>>> connection/session attached Metadata than with a Map > and > > > > could > > > > > >>>>>>> allow > > > > > >>>>>>>>> for > > > > > >>>>>>>>>>> easier client implementations. > > > > > >>>>>>>>>>> It would also make Client-side code easier as there > > > wouldn't > > > > > >>>>> be any > > > > > >>>>>>>>>>> Cache-miss error Messages to handle. > > > > > >>>>>>>>>> It is nice not to have to handle cache-miss responses, I > > > > agree. > > > > > >>>>>>>>>> However, TCP sessions aren't exposed to most of our > > > > client-side > > > > > >>>>> code. > > > > > >>>>>>>>>> For example, when the Producer creates a message and > hands > > > it > > > > > >>>>> off to > > > > > >>>>>>> the > > > > > >>>>>>>>>> NetworkClient, the NC will transparently re-connect and > > > > re-send a > > > > > >>>>>>>>>> message if the first send failed. The higher-level code > > > will > > > > > >>>>> not be > > > > > >>>>>>>>>> informed about whether the TCP session was > re-established, > > > > > >>>>> whether an > > > > > >>>>>>>>>> existing TCP session was used, and so on. So overall I > > > would > > > > > >>>>> still > > > > > >>>>>>> lean > > > > > >>>>>>>>>> towards not coupling this to the TCP session... > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> best, > > > > > >>>>>>>>>> Colin > > > > > >>>>>>>>>> > > > > > >>>>>>>>>>> Thank you again for the KIP. And again, if this was > > > > clarified > > > > > >>>>>>> already > > > > > >>>>>>>>>>> please drop me a hint where I could read about it. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Best Jan > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote: > > > > > >>>>>>>>>>>> Hi all, > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> I created a KIP to improve the scalability and latency > > of > > > > > >>>>>>>>> FetchRequest: > > > > > >>>>>>>>>>>> https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > > > > > >>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > > > > >>>>>>>>> Partition+Scalability > > > > > >>>>>>>>>>>> Please take a look. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> cheers, > > > > > >>>>>>>>>>>> Colin > > > > > > > > > > > > > > > --001a113e4f54bb091c055faf57d9--