Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3FCBA200D52 for ; Sat, 2 Dec 2017 23:37:58 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3E408160BFB; Sat, 2 Dec 2017 22:37:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3685D160BEA for ; Sat, 2 Dec 2017 23:37:57 +0100 (CET) Received: (qmail 5901 invoked by uid 500); 2 Dec 2017 22:37:56 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 5888 invoked by uid 99); 2 Dec 2017 22:37:56 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Dec 2017 22:37:55 +0000 Received: from auth2-smtp.messagingengine.com (auth2-smtp.messagingengine.com [66.111.4.228]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 4D3831A00A0 for ; Sat, 2 Dec 2017 22:37:54 +0000 (UTC) Received: from compute2.internal (compute2.nyi.internal [10.202.2.42]) by mailauth.nyi.internal (Postfix) with ESMTP id 915AD206F4 for ; Sat, 2 Dec 2017 17:37:53 -0500 (EST) Received: from web5 ([10.202.2.215]) by compute2.internal (MEProxy); Sat, 02 Dec 2017 17:37:53 -0500 X-ME-Sender: Received: by mailuser.nyi.internal (Postfix, from userid 99) id 7861A9E0CD; Sat, 2 Dec 2017 17:37:53 -0500 (EST) Message-Id: <1512254273.250506.1191933912.13817ECC@webmail.messagingengine.com> From: Colin McCabe To: dev@kafka.apache.org MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="utf-8" X-Mailer: MessagingEngine.com Webmail Interface - ajax-a169161c References: <1511298156.455181.1180266296.193D3A00@webmail.messagingengine.com> <5A1D027C.7020206@trivago.com> <1511887086.3498918.1186985376.7F1DEDD6@webmail.messagingengine.com> <1511986396.621620.1188577080.5C0A7DEC@webmail.messagingengine.com> <1512063435.1832330.1189640136.2D1EEEC1@webmail.messagingengine.com> Subject: Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability In-Reply-To: Date: Sat, 02 Dec 2017 14:37:53 -0800 archived-at: Sat, 02 Dec 2017 22:37:58 -0000 On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe wrote: > > > On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > > Hey Colin, > > > > > > Thanks much for the update. I have a few questions below: > > > > > > 1. I am not very sure that we need Fetch Session Epoch. It seems that > > > Fetch > > > Session Epoch is only needed to help leader distinguish between "a full > > > fetch request" and "a full fetch request and request a new incremental > > > fetch session". Alternatively, follower can also indicate "a full fetch > > > request and request a new incremental fetch session" by setting Fetch > > > Session ID to -1 without using Fetch Session Epoch. Does this make sense? > > > > Hi Dong, > > > > The fetch session epoch is very important for ensuring correctness. It > > prevents corrupted or incomplete fetch data due to network reordering or > > loss. > > > > For example, consider a scenario where the follower sends a fetch > > request to the leader. The leader responds, but the response is lost > > because of network problems which affected the TCP session. In that > > case, the follower must establish a new TCP session and re-send the > > incremental fetch request. But the leader does not know that the > > follower didn't receive the previous incremental fetch response. It is > > only the incremental fetch epoch which lets the leader know that it > > needs to resend that data, and not data which comes afterwards. > > > > You could construct similar scenarios with message reordering, > > duplication, etc. Basically, this is a stateful protocol on an > > unreliable network, and you need to know whether the follower got the > > previous data you sent before you move on. And you need to handle > > issues like duplicated or delayed requests. These issues do not affect > > the full fetch request, because it is not stateful-- any full fetch > > request can be understood and properly responded to in isolation. > > > > Thanks for the explanation. This makes sense. On the other hand I would > be interested in learning more about whether Becket's solution can help > simplify the protocol by not having the echo field and whether that is > worth doing. Hi Dong, I commented about this in the other thread. A solution which doesn't maintain session information doesn't work here. > > > > > > > > > > > 2. It is said that Incremental FetchRequest will include partitions whose > > > fetch offset or maximum number of fetch bytes has been changed. If > > > follower's logStartOffet of a partition has changed, should this > > > partition also be included in the next FetchRequest to the leader? > > Otherwise, it > > > may affect the handling of DeleteRecordsRequest because leader may not > > know > > > the corresponding data has been deleted on the follower. > > > > Yeah, the follower should include the partition if the logStartOffset > > has changed. That should be spelled out on the KIP. Fixed. > > > > > > > > 3. In the section "Per-Partition Data", a partition is not considered > > > dirty if its log start offset has changed. Later in the section > > "FetchRequest > > > Changes", it is said that incremental fetch responses will include a > > > partition if its logStartOffset has changed. It seems inconsistent. Can > > > you update the KIP to clarify it? > > > > > > > In the "Per-Partition Data" section, it does say that logStartOffset > > changes make a partition dirty, though, right? The first bullet point > > is: > > > > > * The LogCleaner deletes messages, and this changes the log start offset > > of the partition on the leader., or > > > > Ah I see. I think I didn't notice this because statement assumes that the > LogStartOffset in the leader only changes due to LogCleaner. In fact the > LogStartOffset can change on the leader due to either log retention and > DeleteRecordsRequest. I haven't verified whether LogCleaner can change > LogStartOffset though. It may be a bit better to just say that a > partition is considered dirty if LogStartOffset changes. I agree. It should be straightforward to just resend the partition if logStartOffset changes. > > > > > > > 4. In "Fetch Session Caching" section, it is said that each broker has a > > > limited number of slots. How is this number determined? Does this require > > > a new broker config for this number? > > > > Good point. I added two broker configuration parameters to control this > > number. > > > > I am curious to see whether we can avoid some of these new configs. For > example, incremental.fetch.session.cache.slots.per.broker is probably not > necessary because if a leader knows that a FetchRequest comes from a > follower, we probably want the leader to always cache the information > from that follower. Does this make sense? Yeah, maybe we can avoid having incremental.fetch.session.cache.slots.per.broker. > > Maybe we can discuss the config later after there is agreement on how the > protocol would look like. > > > > > > > What is the error code if broker does > > > not have new log for the incoming FetchRequest? > > > > Hmm, is there a typo in this question? Maybe you meant to ask what > > happens if there is no new cache slot for the incoming FetchRequest? > > That's not an error-- the incremental fetch session ID just gets set to > > 0, indicating no incremental fetch session was created. > > > > Yeah there is a typo. You have answered my question. > > > > > > > > > > 5. Can you clarify what happens if follower adds a partition to the > > > ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader > > > needs to generate a new session for this ReplicaFetcherThread or does it > > re-use > > > the existing session? If it uses a new session, is the old session > > > actively deleted from the slot? > > > > The basic idea is that you can't make changes, except by sending a full > > fetch request. However, perhaps we can allow the client to re-use its > > existing session ID. If the client sets sessionId = id, epoch = 0, it > > could re-initialize the session. > > > > Yeah I agree with the basic idea. We probably want to understand more > detail about how this works later. Sounds good. I updated the KIP with this information. A re-initialization should be exactly the same as an initialization, except that it reuses an existing ID. best, Colin > > > > > > > > > > > > > BTW, I think it may be useful if the KIP can include the example workflow > > > of how this feature will be used in case of partition change and so on. > > > > Yeah, that might help. > > > > best, > > Colin > > > > > > > > Thanks, > > > Dong > > > > > > > > > On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe > > > wrote: > > > > > > > I updated the KIP with the ideas we've been discussing. > > > > > > > > best, > > > > Colin > > > > > > > > On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > > > > > On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > > > > > > Hi Colin, thank you for this KIP, it can become a really useful > > thing. > > > > > > > > > > > > I just scanned through the discussion so far and wanted to start a > > > > > > thread to make as decision about keeping the > > > > > > cache with the Connection / Session or having some sort of UUID > > indN > > > > exed > > > > > > global Map. > > > > > > > > > > > > Sorry if that has been settled already and I missed it. In this > > case > > > > > > could anyone point me to the discussion? > > > > > > > > > > Hi Jan, > > > > > > > > > > I don't think anyone has discussed the idea of tying the cache to an > > > > > individual TCP session yet. I agree that since the cache is > > intended to > > > > > be used only by a single follower or client, it's an interesting > > thing > > > > > to think about. > > > > > > > > > > I guess the obvious disadvantage is that whenever your TCP session > > > > > drops, you have to make a full fetch request rather than an > > incremental > > > > > one. It's not clear to me how often this happens in practice -- it > > > > > probably depends a lot on the quality of the network. From a code > > > > > perspective, it might also be a bit difficult to access data > > associated > > > > > with the Session from classes like KafkaApis (although we could > > refactor > > > > > it to make this easier). > > > > > > > > > > It's also clear that even if we tie the cache to the session, we > > still > > > > > have to have limits on the number of caches we're willing to create. > > > > > And probably we should reserve some cache slots for each follower, so > > > > > that clients don't take all of them. > > > > > > > > > > > > > > > > > Id rather see a protocol in which the client is hinting the broker > > > > that, > > > > > > he is going to use the feature instead of a client > > > > > > realizing that the broker just offered the feature (regardless of > > > > > > protocol version which should only indicate that the feature > > > > > > would be usable). > > > > > > > > > > Hmm. I'm not sure what you mean by "hinting." I do think that the > > > > > server should have the option of not accepting incremental requests > > from > > > > > specific clients, in order to save memory space. > > > > > > > > > > > This seems to work better with a per > > > > > > connection/session attached Metadata than with a Map and could > > allow > > > > for > > > > > > easier client implementations. > > > > > > It would also make Client-side code easier as there wouldn't be any > > > > > > Cache-miss error Messages to handle. > > > > > > > > > > It is nice not to have to handle cache-miss responses, I agree. > > > > > However, TCP sessions aren't exposed to most of our client-side code. > > > > > For example, when the Producer creates a message and hands it off to > > the > > > > > NetworkClient, the NC will transparently re-connect and re-send a > > > > > message if the first send failed. The higher-level code will not be > > > > > informed about whether the TCP session was re-established, whether an > > > > > existing TCP session was used, and so on. So overall I would still > > lean > > > > > towards not coupling this to the TCP session... > > > > > > > > > > best, > > > > > Colin > > > > > > > > > > > > > > > > > Thank you again for the KIP. And again, if this was clarified > > already > > > > > > please drop me a hint where I could read about it. > > > > > > > > > > > > Best Jan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 21.11.2017 22:02, Colin McCabe wrote: > > > > > > > Hi all, > > > > > > > > > > > > > > I created a KIP to improve the scalability and latency of > > > > FetchRequest: > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > > > Partition+Scalability > > > > > > > > > > > > > > Please take a look. > > > > > > > > > > > > > > cheers, > > > > > > > Colin > > > > > > > > > > > >