kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin McCabe <cmcc...@apache.org>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Sat, 02 Dec 2017 22:37:53 GMT
On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe <cmccabe@apache.org> wrote:
> 
> > On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > Hey Colin,
> > >
> > > Thanks much for the update. I have a few questions below:
> > >
> > > 1. I am not very sure that we need Fetch Session Epoch. It seems that
> > > Fetch
> > > Session Epoch is only needed to help leader distinguish between "a full
> > > fetch request" and "a full fetch request and request a new incremental
> > > fetch session". Alternatively, follower can also indicate "a full fetch
> > > request and request a new incremental fetch session" by setting Fetch
> > > Session ID to -1 without using Fetch Session Epoch. Does this make sense?
> >
> > Hi Dong,
> >
> > The fetch session epoch is very important for ensuring correctness.  It
> > prevents corrupted or incomplete fetch data due to network reordering or
> > loss.
> >
> > For example, consider a scenario where the follower sends a fetch
> > request to the leader.  The leader responds, but the response is lost
> > because of network problems which affected the TCP session.  In that
> > case, the follower must establish a new TCP session and re-send the
> > incremental fetch request.  But the leader does not know that the
> > follower didn't receive the previous incremental fetch response.  It is
> > only the incremental fetch epoch which lets the leader know that it
> > needs to resend that data, and not data which comes afterwards.
> >
> > You could construct similar scenarios with message reordering,
> > duplication, etc.  Basically, this is a stateful protocol on an
> > unreliable network, and you need to know whether the follower got the
> > previous data you sent before you move on.  And you need to handle
> > issues like duplicated or delayed requests.  These issues do not affect
> > the full fetch request, because it is not stateful-- any full fetch
> > request can be understood and properly responded to in isolation.
> >
> 
> Thanks for the explanation. This makes sense. On the other hand I would
> be interested in learning more about whether Becket's solution can help
> simplify the protocol by not having the echo field and whether that is
> worth doing.

Hi Dong,

I commented about this in the other thread.  A solution which doesn't
maintain session information doesn't work here.

> 
> 
> 
> >
> > >
> > > 2. It is said that Incremental FetchRequest will include partitions whose
> > > fetch offset or maximum number of fetch bytes has been changed. If
> > > follower's logStartOffet of a partition has changed, should this
> > > partition also be included in the next FetchRequest to the leader?
> > Otherwise, it
> > > may affect the handling of DeleteRecordsRequest because leader may not
> > know
> > > the corresponding data has been deleted on the follower.
> >
> > Yeah, the follower should include the partition if the logStartOffset
> > has changed.  That should be spelled out on the KIP.  Fixed.
> >
> > >
> > > 3. In the section "Per-Partition Data", a partition is not considered
> > > dirty if its log start offset has changed. Later in the section
> > "FetchRequest
> > > Changes", it is said that incremental fetch responses will include a
> > > partition if its logStartOffset has changed. It seems inconsistent. Can
> > > you update the KIP to clarify it?
> > >
> >
> > In the "Per-Partition Data" section, it does say that logStartOffset
> > changes make a partition dirty, though, right?  The first bullet point
> > is:
> >
> > > * The LogCleaner deletes messages, and this changes the log start offset
> > of the partition on the leader., or
> >
> 
> Ah I see. I think I didn't notice this because statement assumes that the
> LogStartOffset in the leader only changes due to LogCleaner. In fact the
> LogStartOffset can change on the leader due to either log retention and
> DeleteRecordsRequest. I haven't verified whether LogCleaner can change
> LogStartOffset though. It may be a bit better to just say that a
> partition is considered dirty if LogStartOffset changes.

I agree.  It should be straightforward to just resend the partition if
logStartOffset changes.

> 
> 
> >
> > > 4. In "Fetch Session Caching" section, it is said that each broker has a
> > > limited number of slots. How is this number determined? Does this require
> > > a new broker config for this number?
> >
> > Good point.  I added two broker configuration parameters to control this
> > number.
> >
> 
> I am curious to see whether we can avoid some of these new configs. For
> example, incremental.fetch.session.cache.slots.per.broker is probably not
> necessary because if a leader knows that a FetchRequest comes from a
> follower, we probably want the leader to always cache the information
> from that follower. Does this make sense?

Yeah, maybe we can avoid having
incremental.fetch.session.cache.slots.per.broker.

> 
> Maybe we can discuss the config later after there is agreement on how the
> protocol would look like.
> 
> 
> >
> > > What is the error code if broker does
> > > not have new log for the incoming FetchRequest?
> >
> > Hmm, is there a typo in this question?  Maybe you meant to ask what
> > happens if there is no new cache slot for the incoming FetchRequest?
> > That's not an error-- the incremental fetch session ID just gets set to
> > 0, indicating no incremental fetch session was created.
> >
> 
> Yeah there is a typo. You have answered my question.
> 
> 
> >
> > >
> > > 5. Can you clarify what happens if follower adds a partition to the
> > > ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader
> > > needs to generate a new session for this ReplicaFetcherThread or does it
> > re-use
> > > the existing session?  If it uses a new session, is the old session
> > > actively deleted from the slot?
> >
> > The basic idea is that you can't make changes, except by sending a full
> > fetch request.  However, perhaps we can allow the client to re-use its
> > existing session ID.  If the client sets sessionId = id, epoch = 0, it
> > could re-initialize the session.
> >
> 
> Yeah I agree with the basic idea. We probably want to understand more
> detail about how this works later.

Sounds good.  I updated the KIP with this information.  A
re-initialization should be exactly the same as an initialization,
except that it reuses an existing ID.

best,
Colin


> 
> 
> >
> > >
> > >
> > > BTW, I think it may be useful if the KIP can include the example workflow
> > > of how this feature will be used in case of partition change and so on.
> >
> > Yeah, that might help.
> >
> > best,
> > Colin
> >
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe <cmccabe@apache.org>
> > > wrote:
> > >
> > > > I updated the KIP with the ideas we've been discussing.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> > > > > On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > > > > > Hi Colin, thank you  for this KIP, it can become a really useful
> > thing.
> > > > > >
> > > > > > I just scanned through the discussion so far and wanted to start
a
> > > > > > thread to make as decision about keeping the
> > > > > > cache with the Connection / Session or having some sort of UUID
> > indN
> > > > exed
> > > > > > global Map.
> > > > > >
> > > > > > Sorry if that has been settled already and I missed it. In this
> > case
> > > > > > could anyone point me to the discussion?
> > > > >
> > > > > Hi Jan,
> > > > >
> > > > > I don't think anyone has discussed the idea of tying the cache to
an
> > > > > individual TCP session yet.  I agree that since the cache is
> > intended to
> > > > > be used only by a single follower or client, it's an interesting
> > thing
> > > > > to think about.
> > > > >
> > > > > I guess the obvious disadvantage is that whenever your TCP session
> > > > > drops, you have to make a full fetch request rather than an
> > incremental
> > > > > one.  It's not clear to me how often this happens in practice --
it
> > > > > probably depends a lot on the quality of the network.  From a code
> > > > > perspective, it might also be a bit difficult to access data
> > associated
> > > > > with the Session from classes like KafkaApis (although we could
> > refactor
> > > > > it to make this easier).
> > > > >
> > > > > It's also clear that even if we tie the cache to the session, we
> > still
> > > > > have to have limits on the number of caches we're willing to create.
> > > > > And probably we should reserve some cache slots for each follower,
so
> > > > > that clients don't take all of them.
> > > > >
> > > > > >
> > > > > > Id rather see a protocol in which the client is hinting the
broker
> > > > that,
> > > > > > he is going to use the feature instead of a client
> > > > > > realizing that the broker just offered the feature (regardless
of
> > > > > > protocol version which should only indicate that the feature
> > > > > > would be usable).
> > > > >
> > > > > Hmm.  I'm not sure what you mean by "hinting."  I do think that the
> > > > > server should have the option of not accepting incremental requests
> > from
> > > > > specific clients, in order to save memory space.
> > > > >
> > > > > > This seems to work better with a per
> > > > > > connection/session attached Metadata than with a Map and could
> > allow
> > > > for
> > > > > > easier client implementations.
> > > > > > It would also make Client-side code easier as there wouldn't
be any
> > > > > > Cache-miss error Messages to handle.
> > > > >
> > > > > It is nice not to have to handle cache-miss responses, I agree.
> > > > > However, TCP sessions aren't exposed to most of our client-side code.
> > > > > For example, when the Producer creates a message and hands it off
to
> > the
> > > > > NetworkClient, the NC will transparently re-connect and re-send a
> > > > > message if the first send failed.  The higher-level code will not
be
> > > > > informed about whether the TCP session was re-established, whether
an
> > > > > existing TCP session was used, and so on.  So overall I would still
> > lean
> > > > > towards not coupling this to the TCP session...
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > >
> > > > > >   Thank you again for the KIP. And again, if this was clarified
> > already
> > > > > > please drop me a hint where I could read about it.
> > > > > >
> > > > > > Best Jan
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 21.11.2017 22:02, Colin McCabe wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I created a KIP to improve the scalability and latency
of
> > > > FetchRequest:
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > Partition+Scalability
> > > > > > >
> > > > > > > Please take a look.
> > > > > > >
> > > > > > > cheers,
> > > > > > > Colin
> > > > > >
> > > >
> >

Mime
View raw message