kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Becket Qin <becket....@gmail.com>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Tue, 02 Jan 2018 12:46:27 GMT
Hi Colin,

Good point about KIP-226. Maybe a separate broker epoch is needed although
it is a little awkward to let the consumer set this. So was there a
solution to the frequent pause and resume scenario? Did I miss something?

Thanks,

Jiangjie (Becket) Qin

On Wed, Dec 27, 2017 at 1:40 PM, Colin McCabe <cmccabe@apache.org> wrote:

> On Sat, Dec 23, 2017, at 09:15, Becket Qin wrote:
> > Hi Colin,
> >
> > Thanks for the explanation. I want to clarify a bit more on my thoughts.
> >
> > I am fine with having a separate discussion as long as the follow-up
> > discussion will be incremental on top of this KIP instead of override the
> > protocol in this KIP.
>
> Hi Becket,
>
> Thanks for the clarification.  I do think that the changes we've been
> discussing would be incremental rather than completely replacing what we've
> talked about here.  See my responses inline below.
>
> >
> > I completely agree this KIP is useful by itself. That being said, we want
> > to avoid falling into a "local optimal" solution by just saying because
> it
> > solves the problem in this scope. I think we should also think if the
> > solution aligns with a "global optimal" (systematic optimal) solution as
> > well. That is why I brought up other considerations. If they turned out
> to
> > be orthogonal and should be addressed separately, that's good. But at
> least
> > it is worth thinking about the potential connections between those
> things.
> >
> > One example of such related consideration is the following two seemingly
> > unrelated things:
> >
> > 1. I might have missed the discussion, but it seems the concern of the
> > clients doing frequent pause and resume is still not addressed. Since
> this
> > is a pretty common use case for applications that want to have flow
> > control, or have prioritized consumption, or get consumption fairness, we
> > probably want to see how to handle this case. One of the solution might
> be
> > a long-lived session id spanning the clients' life time.
> >
> > 2. KAFKA-6029. The key problem is that the leader wants to know if a
> fetch
> > request is from a shutting down broker or from a restarted broker.
> >
> > The connection between those two issues is that both of them could be
> > addressed by having a life-long session id for each client (or fetcher,
> to
> > be more accurate). This may indicate that having a life long session id
> > might be a "global optimal" solution so it should be considered in this
> > KIP. Otherwise, a follow up KIP discussion for KAFKA-6029 may either
> > introduce a broker epoch unnecessarily (which will not be used by the
> > consumers at all) or override what we do in this KIP.
>
> Remember that a given follower will have more than one fetch session ID.
> Each fetcher thread will have its own session ID.  And we will eventually
> be able to dynamically add or remove fetcher threads using KIP-226.
> Therefore, we can't use fetch session IDs to uniquely identify a given
> broker incarnation.  Any time we increase the number of fetcher threads, a
> new fetch session ID will show up.
>
> If we want to know if a fetch request is from a shutting down broker or
> from a restarted broker, the most straightforward and robust way would
> probably be to add an incarnation number for each broker.  ZK can track
> this number.  This also helps with debugging and logging (you can tell
> "aha-- this request came from the second incarnation, not the first."
>
> > BTW, to clarify, the main purpose of returning the data at the index
> > boundary was to get the same benefit of efficient incremental fetch for
> > both low vol and high vol partitions, which is directly related to the
> > primary goal in this KIP. The other things (such as avoiding binary
> search)
> > are just potential additional gain, and they are also brought up to see
> if
> > that could be a "global optimal" solution.
>
> I still think these are separate.  The primary goal of the KIP was to make
> fetch requests where not all partitions are returning data more efficient.
> This isn't really related to the goal of trying to make accessing
> historical data more efficient.  In most cases, the data we're accessing is
> very recent data, and index lookups are not an issue.
>
> >
> > Some other replies below.
> > >In order for improvements to succeed, I think that it's important to
> > clearly define the scope and goals.  One good example of this was the
> > AdminClient KIP.  We deliberately avoiding ?>discussing new
> administrative
> > RPCs in that KIP, in order to limit the scope.  This kept the discussion
> > focused on the user interfaces and configuration, rather than on the
> > details of possible >new RPCs.  Once the KIP was completed, it was easy
> for
> > us to add new RPCs later in separate KIPs.
> > Hmm, why is AdminClient is related? All the discussion are about how to
> > make fetch more efficient, right?
> >
> > >Finally, it's not clear that the approach you are proposing is the right
> > way to go.  I think we would need to have a lot more discussion about it.
> > One very big disadvantage is that it couples >what we send back on the
> wire
> > tightly to what is on the disk.  It's not clear that we want to do that.
> > What if we want to change how things are stored in the future?  How does
> > this work with >clients' own concept of fetch sizes?  And so on, and so
> > on.  This needs its own discussion thread.
> > That might be true. However, the index file by definition is for the
> files
> > stored on the disk. So if we decide to change the storage layer to
> > something else, it seems natural to use some other suitable ways to get
> the
> > offsets efficiently.
> >
> > >There are a lot of simpler solutions that might work as well or better.
> > For example, each partition could keep an in-memory LRU cache of the most
> > recently used offset to file position >mappings.  Or we could have a
> thread
> > periodically touch the latest page or two of memory in the index file for
> > each partition, to make sure that it didn't fall out of the cache.  In
> some
> > offline >discussions, some of these approaches have looked quite
> > promising.  I've even seen some good performance numbers for prototypes.
> > In any case, it's a separate problem which needs its >own KIP, I think.
> > Those are indeed separate discussions. I was not intended to discuss them
> > in this KIP. Sorry about the confusion.
> >
> > Thanks and Merry Christmas,
>
> Happy new year.  Sorry if some of my responses are delayed (I'm on
> vacation).
>
> cheers,
> Colin
>
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Sat, Dec 23, 2017 at 1:16 AM, Colin McCabe <cmccabe@apache.org>
> wrote:
> >
> > > On Fri, Dec 22, 2017, at 14:31, Becket Qin wrote:
> > > > >>
> > > > >> The point I want to make is that avoiding doing binary search on
> index
> > > > >> file and avoid reading the log segments during fetch has some
> > > additional
> > > > >> benefits. So if the solution works for the current KIP, it might
> be a
> > > > >> better choice.
> > > >
> > > > >Let's discuss this in a follow-on KIP.
> > > >
> > > > If the discussion will potentially change the protocol in the current
> > > > proposal. Would it be better to discuss it now instead of in a
> follow-up
> > > > KIP so we don't have some protocol that immediately requires a
> change.
> > >
> > > Hi Becket,
> > >
> > > I think that the problem that you are discussing is different than the
> > > problem this KIP is designed to address.  This KIP is targeted at
> > > eliminating the wastefulness of re-transmitting information about
> > > partitions that haven't changed in every FetchRequest and
> FetchResponse.
> > > The problem you are discussing is dealing with situations where the
> index
> > > file or the data file is not in the page cache, and therefore we take a
> > > page fault when doing an index lookup.
> > >
> > > This KIP is useful and valuable on its own.  For example, if you have
> > > brokers in a public cloud in different availability zones, you may
> wish to
> > > minimize the network traffic between them.  Therefore, you don't want
> every
> > > FetchRequest between brokers to be a full FetchRequest.  In that case,
> this
> > > KIP is very valuable.
> > >
> > > In order for improvements to succeed, I think that it's important to
> > > clearly define the scope and goals.  One good example of this was the
> > > AdminClient KIP.  We deliberately avoiding discussing new
> administrative
> > > RPCs in that KIP, in order to limit the scope.  This kept the
> discussion
> > > focused on the user interfaces and configuration, rather than on the
> > > details of possible new RPCs.  Once the KIP was completed, it was easy
> for
> > > us to add new RPCs later in separate KIPs.
> > >
> > > While it's clear that there is probably even more we could do to
> optimize
> > > fetch requests, making them incremental seems like a good first cut.  I
> > > deliberately avoided changing the replication protocol in this KIP,
> because
> > > I think that it's a big enough change as-is.  If we want to change the
> > > replication protocol in the future, there is nothing preventing us...
> and
> > > this change will be a useful starting point.
> > >
> > > Finally, it's not clear that the approach you are proposing is the
> right
> > > way to go.  I think we would need to have a lot more discussion about
> it.
> > > One very big disadvantage is that it couples what we send back on the
> wire
> > > tightly to what is on the disk.  It's not clear that we want to do
> that.
> > > What if we want to change how things are stored in the future?  How
> does
> > > this work with clients' own concept of fetch sizes?  And so on, and so
> on.
> > > This needs its own discussion thread.
> > >
> > > There are a lot of simpler solutions that might work as well or better.
> > > For example, each partition could keep an in-memory LRU cache of the
> most
> > > recently used offset to file position mappings.  Or we could have a
> thread
> > > periodically touch the latest page or two of memory in the index file
> for
> > > each partition, to make sure that it didn't fall out of the cache.  In
> some
> > > offline discussions, some of these approaches have looked quite
> promising.
> > > I've even seen some good performance numbers for prototypes.  In any
> case,
> > > it's a separate problem which needs its own KIP, I think.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > >
> > > > On Tue, Dec 19, 2017 at 9:26 AM, Colin McCabe <colin@cmccabe.xyz>
> wrote:
> > > >
> > > > > On Tue, Dec 19, 2017, at 02:16, Jan Filipiak wrote:
> > > > > > Sorry for coming back at this so late.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 11.12.2017 07:12, Colin McCabe wrote:
> > > > > > > On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote:
> > > > > > >> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote:
> > > > > > >>> Hi,
> > > > > > >>>
> > > > > > >>> sorry for the late reply, busy times :-/
> > > > > > >>>
> > > > > > >>> I would ask you one thing maybe. Since the timeout
> > > > > > >>> argument seems to be settled I have no further argument
> > > > > > >>> form your side except the "i don't want to".
> > > > > > >>>
> > > > > > >>> Can you see that connection.max.idle.max is the exact time
> > > > > > >>> that expresses "We expect the client to be away for this
> long,
> > > > > > >>> and come back and continue"?
> > > > > > >> Hi Jan,
> > > > > > >>
> > > > > > >> Sure, connection.max.idle.max is the exact time that we want
> to
> > > keep
> > > > > > >> around a TCP session.  TCP sessions are relatively cheap, so
> we
> > > can
> > > > > > >> afford to keep them around for 10 minutes by default.
> Incremental
> > > > > fetch
> > > > > > >> state is less cheap, so we want to set a shorter timeout for
> it.
> > > We
> > > > > > >> also want new TCP sessions to be able to reuse an existing
> > > incremental
> > > > > > >> fetch session rather than creating a new one and waiting for
> the
> > > old
> > > > > one
> > > > > > >> to time out.
> > > > > > >>
> > > > > > >>> also clarified some stuff inline
> > > > > > >>>
> > > > > > >>> Best Jan
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On 05.12.2017 23:14, Colin McCabe wrote:
> > > > > > >>>> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > > > >>>>> Hi Colin
> > > > > > >>>>>
> > > > > > >>>>> Addressing the topic of how to manage slots from the other
> > > thread.
> > > > > > >>>>> With tcp connections all this comes for free essentially.
> > > > > > >>>> Hi Jan,
> > > > > > >>>>
> > > > > > >>>> I don't think that it's accurate to say that cache
> management
> > > > > "comes for
> > > > > > >>>> free" by coupling the incremental fetch session with the TCP
> > > > > session.
> > > > > > >>>> When a new TCP session is started by a fetch request, you
> still
> > > > > have to
> > > > > > >>>> decide whether to grant that request an incremental fetch
> > > session or
> > > > > > >>>> not.  If your answer is that you always grant the request, I
> > > would
> > > > > argue
> > > > > > >>>> that you do not have cache management.
> > > > > > >>> First I would say, the client has a big say in this. If the
> > > client
> > > > > > >>> is not going to issue incremental he shouldn't ask for a
> cache
> > > > > > >>> when the client ask for the cache we still have all options
> to
> > > deny.
> > > > > > >> To put it simply, we have to have some cache management above
> and
> > > > > beyond
> > > > > > >> just giving out an incremental fetch session to anyone who
> has a
> > > TCP
> > > > > > >> session.  Therefore, caching does not become simpler if you
> > > couple the
> > > > > > >> fetch session to the TCP session.
> > > > > > Simply giving out an fetch session for everyone with a
> connection is
> > > too
> > > > > > simple,
> > > > > > but I think it plays well into the idea of consumers choosing to
> use
> > > the
> > > > > > feature
> > > > > > therefore only enabling where it brings maximum gains
> > > > > > (replicas,MirrorMakers)
> > > > > > >>
> > > > > > >>>> I guess you could argue that timeouts are cache management,
> but
> > > I
> > > > > don't
> > > > > > >>>> find that argument persuasive.  Anyone could just create a
> lot
> > > of
> > > > > TCP
> > > > > > >>>> sessions and use a lot of resources, in that case.  So
> there is
> > > > > > >>>> essentially no limit on memory use.  In any case, TCP
> sessions
> > > don't
> > > > > > >>>> help us implement fetch session timeouts.
> > > > > > >>> We still have all the options denying the request to keep the
> > > state.
> > > > > > >>> What you want seems like a max connections / ip safeguard.
> > > > > > >>> I can currently take down a broker with to many connections
> > > easily.
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>>> I still would argue we disable it by default and make a
> flag
> > > in the
> > > > > > >>>>> broker to ask the leader to maintain the cache while
> > > replicating
> > > > > and also only
> > > > > > >>>>> have it optional in consumers (default to off) so one can
> turn
> > > it
> > > > > on
> > > > > > >>>>> where it really hurts.  MirrorMaker and audit consumers
> > > > > prominently.
> > > > > > >>>> I agree with Jason's point from earlier in the thread.
> Adding
> > > extra
> > > > > > >>>> configuration knobs that aren't really necessary can harm
> > > usability.
> > > > > > >>>> Certainly asking people to manually turn on a feature
> "where it
> > > > > really
> > > > > > >>>> hurts" seems to fall in that category, when we could easily
> > > enable
> > > > > it
> > > > > > >>>> automatically for them.
> > > > > > >>> This doesn't make much sense to me.
> > > > > > >> There are no tradeoffs to think about from the client's point
> of
> > > view:
> > > > > > >> it always wants an incremental fetch session.  So there is no
> > > benefit
> > > > > to
> > > > > > >> making the clients configure an extra setting.  Updating and
> > > managing
> > > > > > >> client configurations is also more difficult than managing
> broker
> > > > > > >> configurations for most users.
> > > > > > >>
> > > > > > >>> You also wanted to implement
> > > > > > >>> a "turn of in case of bug"-knob. Having the client indicate
> if
> > > the
> > > > > > >>> feauture will be used seems reasonable to me.,
> > > > > > >> True.  However, if there is a bug, we could also roll back the
> > > client,
> > > > > > >> so having this configuration knob is not strictly required.
> > > > > > >>
> > > > > > >>>>> Otherwise I left a few remarks in-line, which should help
> to
> > > > > understand
> > > > > > >>>>> my view of the situation better
> > > > > > >>>>>
> > > > > > >>>>> Best Jan
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> On 05.12.2017 08:06, Colin McCabe wrote:
> > > > > > >>>>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > > > > >>>>>>> On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > > >>>>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > > > > >>>>>>>>> Thanks for the explanation, Colin. A few more
> questions.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> The session epoch is not complex.  It's just a number
> > > which
> > > > > increments
> > > > > > >>>>>>>>>> on each incremental fetch.  The session epoch is also
> > > useful
> > > > > for
> > > > > > >>>>>>>>>> debugging-- it allows you to match up requests and
> > > responses
> > > > > when
> > > > > > >>>>>>>>>> looking at log files.
> > > > > > >>>>>>>>> Currently each request in Kafka has a correlation id to
> > > help
> > > > > match the
> > > > > > >>>>>>>>> requests and responses. Is epoch doing something
> > > differently?
> > > > > > >>>>>>>> Hi Becket,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> The correlation ID is used within a single TCP session,
> to
> > > > > uniquely
> > > > > > >>>>>>>> associate a request with a response.  The correlation
> ID is
> > > not
> > > > > unique
> > > > > > >>>>>>>> (and has no meaning) outside the context of that single
> TCP
> > > > > session.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Keep in mind, NetworkClient is in charge of TCP
> sessions,
> > > and
> > > > > generally
> > > > > > >>>>>>>> tries to hide that information from the upper layers of
> the
> > > > > code.  So
> > > > > > >>>>>>>> when you submit a request to NetworkClient, you don't
> know
> > > if
> > > > > that
> > > > > > >>>>>>>> request creates a TCP session, or reuses an existing
> one.
> > > > > > >>>>>>>>>> Unfortunately, this doesn't work.  Imagine the client
> > > misses
> > > > > an
> > > > > > >>>>>>>>>> increment fetch response about a partition.  And then
> the
> > > > > partition is
> > > > > > >>>>>>>>>> never updated after that.  The client has no way to
> know
> > > > > about the
> > > > > > >>>>>>>>>> partition, since it won't be included in any future
> > > > > incremental fetch
> > > > > > >>>>>>>>>> responses.  And there are no offsets to compare,
> since the
> > > > > partition is
> > > > > > >>>>>>>>>> simply omitted from the response.
> > > > > > >>>>>>>>> I am curious about in which situation would the
> follower
> > > miss
> > > > > a response
> > > > > > >>>>>>>>> of a partition. If the entire FetchResponse is lost
> (e.g.
> > > > > timeout), the
> > > > > > >>>>>>>>> follower would disconnect and retry. That will result
> in
> > > > > sending a full
> > > > > > >>>>>>>>> FetchRequest.
> > > > > > >>>>>>>> Basically, you are proposing that we rely on TCP for
> > > reliable
> > > > > delivery
> > > > > > >>>>>>>> in a distributed system.  That isn't a good idea for a
> > > bunch of
> > > > > > >>>>>>>> different reasons.  First of all, TCP timeouts tend to
> be
> > > very
> > > > > long.  So
> > > > > > >>>>>>>> if the TCP session timing out is your error detection
> > > > > mechanism, you
> > > > > > >>>>>>>> have to wait minutes for messages to timeout.  Of
> course, we
> > > > > add a
> > > > > > >>>>>>>> timeout on top of that after which we declare the
> connection
> > > > > bad and
> > > > > > >>>>>>>> manually close it.  But just because the session is
> closed
> > > on
> > > > > one end
> > > > > > >>>>>>>> doesn't mean that the other end knows that it is
> closed.  So
> > > > > the leader
> > > > > > >>>>>>>> may have to wait quite a long time before TCP decides
> that
> > > yes,
> > > > > > >>>>>>>> connection X from the follower is dead and not coming
> back,
> > > > > even though
> > > > > > >>>>>>>> gremlins ate the FIN packet which the follower
> attempted to
> > > > > translate.
> > > > > > >>>>>>>> If the cache state is tied to that TCP session, we have
> to
> > > keep
> > > > > that
> > > > > > >>>>>>>> cache around for a much longer time than we should.
> > > > > > >>>>>>> Hi,
> > > > > > >>>>>>>
> > > > > > >>>>>>> I see this from a different perspective. The cache expiry
> > > time
> > > > > > >>>>>>> has the same semantic as idle connection time in this
> > > scenario.
> > > > > > >>>>>>> It is the time range we expect the client to come back an
> > > reuse
> > > > > > >>>>>>> its broker side state. I would argue that on close we
> would
> > > get
> > > > > an
> > > > > > >>>>>>> extra shot at cleaning up the session state early. As
> > > opposed to
> > > > > > >>>>>>> always wait for that duration for expiry to happen.
> > > > > > >>>>>> Hi Jan,
> > > > > > >>>>>>
> > > > > > >>>>>> The idea here is that the incremental fetch cache expiry
> time
> > > can
> > > > > be
> > > > > > >>>>>> much shorter than the TCP session timeout.  In general
> the TCP
> > > > > session
> > > > > > >>>>>> timeout is common to all TCP connections, and very long.
> To
> > > make
> > > > > these
> > > > > > >>>>>> numbers a little more concrete, the TCP session timeout is
> > > often
> > > > > > >>>>>> configured to be 2 hours on Linux.  (See
> > > > > > >>>>>> https://www.cyberciti.biz/tips/linux-increasing-or-
> > > > > decreasing-tcp-sockets-timeouts.html
> > > > > > >>>>>> )  The timeout I was proposing for incremental fetch
> sessions
> > > was
> > > > > one or
> > > > > > >>>>>> two minutes at most.
> > > > > > >>>>> Currently this is taken care of by
> > > > > > >>>>> connections.max.idle.ms on the broker and defaults to
> > > something
> > > > > of few
> > > > > > >>>>> minutes.
> > > > > > >>>> It is 10 minutes by default, which is longer than what we
> want
> > > the
> > > > > > >>>> incremental fetch session timeout to be.  There's no reason
> to
> > > > > couple
> > > > > > >>>> these two things.
> > > > > > >>>>
> > > > > > >>>>> Also something we could let the client change if we really
> > > wanted
> > > > > to.
> > > > > > >>>>> So there is no need to worry about coupling our
> implementation
> > > to
> > > > > some
> > > > > > >>>>> timeouts given by the OS, with TCP one always has full
> control
> > > > > over the worst
> > > > > > >>>>> times + one gets the extra shot cleaning up early when the
> > > close
> > > > > comes through.
> > > > > > >>>>> Which is the majority of the cases.
> > > > > > >>>> In the majority of cases, the TCP session will be
> > > re-established.
> > > > > In
> > > > > > >>>> that case, we have to send a full fetch request rather than
> an
> > > > > > >>>> incremental fetch request.
> > > > > > >>> I actually have a hard time believing this. Do you have any
> > > numbers
> > > > > of
> > > > > > >>> any existing production system? Is it the virtualisation
> layer
> > > > > cutting
> > > > > > >>> all the connections?
> > > > > > >>> We see this only on application crashes and restarts where
> the
> > > app
> > > > > needs
> > > > > > >>> todo the full anyways
> > > > > > >>> as it probably continues with stores offsets.
> > > > > > >> Yes, TCP connections get dropped.  It happens very often in
> > > production
> > > > > > >> clusters, actually.  When I was working on Hadoop, one of the
> most
> > > > > > >> common questions I heard from newcomers was "why do I see so
> many
> > > > > > >> EOFException messages in the logs"?  The other thing that
> happens
> > > a
> > > > > lot
> > > > > > >> is DNS outages or slowness.  Public clouds seem to have even
> more
> > > > > > >> unstable networks than the on-premise clusters.  I am not
> sure why
> > > > > that
> > > > > > >> is.
> > > > > > Hadoop has a wiki page on exactly this
> > > > > > https://wiki.apache.org/hadoop/EOFException
> > > > > >
> > > > > > besides user errors they have servers crashing and actually loss
> of
> > > > > > connection high on their list.
> > > > > > In the case of "server goes away" the cache goes with it. So
> nothing
> > > to
> > > > > > argue about the cache beeing reused by
> > > > > > a new connection.
> > > > > >
> > > > > > Can you make an argument at which point the epoch would be
> updated
> > > > > > broker side to maximise re-usage of the cache on
> > > > > > lost connections. In many cases the epoch would go out of sync
> and we
> > > > > > would need a full fetch anyways. Am I mistaken here?
> > > > >
> > > > > The current proposal is that the server can accept multiple
> requests
> > > in a
> > > > > row with the same sequence number.
> > > > >
> > > > > Colin
> > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > >>
> > > > > > >>>>>>>> Secondly, from a software engineering perspective, it's
> not
> > > a
> > > > > good idea
> > > > > > >>>>>>>> to try to tightly tie together TCP and our code.  We
> would
> > > have
> > > > > to
> > > > > > >>>>>>>> rework how we interact with NetworkClient so that we are
> > > aware
> > > > > of things
> > > > > > >>>>>>>> like TCP sessions closing or opening.  We would have to
> be
> > > > > careful
> > > > > > >>>>>>>> preserve the ordering of incoming messages when doing
> things
> > > > > like
> > > > > > >>>>>>>> putting incoming requests on to a queue to be processed
> by
> > > > > multiple
> > > > > > >>>>>>>> threads.  It's just a lot of complexity to add, and
> there's
> > > no
> > > > > upside.
> > > > > > >>>>>>> I see the point here. And I had a small chat with Dong
> Lin
> > > > > already
> > > > > > >>>>>>> making me aware of this. I tried out the approaches and
> > > propose
> > > > > the
> > > > > > >>>>>>> following:
> > > > > > >>>>>>>
> > > > > > >>>>>>> The client start and does a full fetch. It then does
> > > incremental
> > > > > fetches.
> > > > > > >>>>>>> The connection to the broker dies and is re-established
> by
> > > > > NetworkClient
> > > > > > >>>>>>> under the hood.
> > > > > > >>>>>>> The broker sees an incremental fetch without having
> state =>
> > > > > returns
> > > > > > >>>>>>> error:
> > > > > > >>>>>>> Client sees the error, does a full fetch and goes back to
> > > > > incrementally
> > > > > > >>>>>>> fetching.
> > > > > > >>>>>>>
> > > > > > >>>>>>> having this 1 additional error round trip is essentially
> the
> > > > > same as
> > > > > > >>>>>>> when something
> > > > > > >>>>>>> with the sessions or epoch changed unexpectedly to the
> client
> > > > > (say
> > > > > > >>>>>>> expiry).
> > > > > > >>>>>>>
> > > > > > >>>>>>> So its nothing extra added but the conditions are easier
> to
> > > > > evaluate.
> > > > > > >>>>>>> Especially since we do everything with NetworkClient.
> Other
> > > > > implementers
> > > > > > >>>>>>> on the
> > > > > > >>>>>>> protocol are free to optimizes this and do not do the
> > > errornours
> > > > > > >>>>>>> roundtrip on the
> > > > > > >>>>>>> new connection.
> > > > > > >>>>>>> Its a great plus that the client can know when the error
> is
> > > gonna
> > > > > > >>>>>>> happen. instead of
> > > > > > >>>>>>> the server to always have to report back if something
> changes
> > > > > > >>>>>>> unexpectedly for the client
> > > > > > >>>>>> You are assuming that the leader and the follower agree
> that
> > > the
> > > > > TCP
> > > > > > >>>>>> session drops at the same time.  When there are network
> > > problems,
> > > > > this
> > > > > > >>>>>> may not be true.  The leader may still think the previous
> TCP
> > > > > session is
> > > > > > >>>>>> active.  In that case, we have to keep the incremental
> fetch
> > > > > session
> > > > > > >>>>>> state around until we learn otherwise (which could be up
> to
> > > that
> > > > > 2 hour
> > > > > > >>>>>> timeout I mentioned).  And if we get a new incoming
> > > incremental
> > > > > fetch
> > > > > > >>>>>> request, we can't assume that it replaces the previous
> one,
> > > > > because the
> > > > > > >>>>>> IDs will be different (the new one starts a new session).
> > > > > > >>>>> As mentioned, no reason to fear some time-outs out of our
> > > control
> > > > > > >>>>>>>> Imagine that I made an argument that client IDs are
> > > "complex"
> > > > > and should
> > > > > > >>>>>>>> be removed from our APIs.  After all, we can just look
> at
> > > the
> > > > > remote IP
> > > > > > >>>>>>>> address and TCP port of each connection.  Would you
> think
> > > that
> > > > > was a
> > > > > > >>>>>>>> good idea?  The client ID is useful when looking at
> logs.
> > > For
> > > > > example,
> > > > > > >>>>>>>> if a rebalance is having problems, you want to know what
> > > > > clients were
> > > > > > >>>>>>>> having a problem.  So having the client ID field to
> guide
> > > you is
> > > > > > >>>>>>>> actually much less "complex" in practice than not
> having an
> > > ID.
> > > > > > >>>>>>> I still cant follow why the correlation idea will not
> help
> > > here.
> > > > > > >>>>>>> Correlating logs with it usually works great. Even with
> > > > > primitive tools
> > > > > > >>>>>>> like grep
> > > > > > >>>>>> The correlation ID does help somewhat, but certainly not
> as
> > > much
> > > > > as a
> > > > > > >>>>>> unique 64-bit ID.  The correlation ID is not unique in the
> > > > > broker, just
> > > > > > >>>>>> unique to a single NetworkClient.  Simiarly, the
> correlation
> > > ID
> > > > > is not
> > > > > > >>>>>> unique on the client side, if there are multiple
> Consumers,
> > > etc.
> > > > > > >>>>> Can always bump entropy in correlation IDs, never had a
> problem
> > > > > > >>>>> of finding to many duplicates. Would be a different KIP
> though.
> > > > > > >>>>>>>> Similarly, if metadata responses had epoch numbers
> (simple
> > > > > incrementing
> > > > > > >>>>>>>> numbers), we would not have to debug problems like
> clients
> > > > > accidentally
> > > > > > >>>>>>>> getting old metadata from servers that had been
> partitioned
> > > off
> > > > > from the
> > > > > > >>>>>>>> network for a while.  Clients would know the difference
> > > between
> > > > > old and
> > > > > > >>>>>>>> new metadata.  So putting epochs in to the metadata
> request
> > > is
> > > > > much less
> > > > > > >>>>>>>> "complex" operationally, even though it's an extra
> field in
> > > the
> > > > > request.
> > > > > > >>>>>>>>      This has been discussed before on the mailing list.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> So I think the bottom line for me is that having the
> > > session ID
> > > > > and
> > > > > > >>>>>>>> session epoch, while it adds two extra fields, reduces
> > > > > operational
> > > > > > >>>>>>>> complexity and increases debuggability.  It avoids
> tightly
> > > > > coupling us
> > > > > > >>>>>>>> to assumptions about reliable ordered delivery which
> tend
> > > to be
> > > > > violated
> > > > > > >>>>>>>> in practice in multiple layers of the stack.  Finally,
> it
> > > > > avoids the
> > > > > > >>>>>>>> necessity of refactoring NetworkClient.
> > > > > > >>>>>>> So there is stacks out there that violate TCP
> guarantees? And
> > > > > software
> > > > > > >>>>>>> still works? How can this be? Can you elaborate a little
> > > where
> > > > > this
> > > > > > >>>>>>> can be violated? I am not very familiar with virtualized
> > > > > environments
> > > > > > >>>>>>> but they can't really violate TCP contracts.
> > > > > > >>>>>> TCP's guarantees of reliable, in-order transmission
> certainly
> > > can
> > > > > be
> > > > > > >>>>>> violated.  For example, I once had to debug a cluster
> where a
> > > > > certain
> > > > > > >>>>>> node had a network card which corrupted its transmissions
> > > > > occasionally.
> > > > > > >>>>>> With all the layers of checksums, you would think that
> this
> > > was
> > > > > not
> > > > > > >>>>>> possible, but it happened.  We occasionally got corrupted
> data
> > > > > written
> > > > > > >>>>>> to disk on the other end because of it.  Even more
> > > frustrating,
> > > > > the data
> > > > > > >>>>>> was not corrupted on disk on the sending node-- it was a
> bug
> > > in
> > > > > the
> > > > > > >>>>>> network card driver that was injecting the errors.
> > > > > > >>>>> true, but your broker might aswell read a corrupted 600GB
> as
> > > size
> > > > > from
> > > > > > >>>>> the network and die with OOM instantly.
> > > > > > >>>> If you read 600 GB as the size from the network, you will
> not
> > > "die
> > > > > with
> > > > > > >>>> OOM instantly."  That would be a bug.  Instead, you will
> notice
> > > > > that 600
> > > > > > >>>> GB is greater than max.message.bytes, and close the
> connection.
> > > > > > >>> We only check max.message.bytes to late to guard against
> consumer
> > > > > > >>> stalling.
> > > > > > >>> we dont have a notion of max.networkpacket.size before we
> > > allocate
> > > > > the
> > > > > > >>> bytebuffer to read it into.
> > > > > > >> "network packets" are not the same thing as "kafka RPCs."  One
> > > Kafka
> > > > > RPC
> > > > > > >> could take up mutiple ethernet packets.
> > > > > > >>
> > > > > > >> Also, max.message.bytes has nothing to do with "consumer
> > > stalling" --
> > > > > > >> you are probably thinking about some of the fetch request
> > > > > > >> configurations.  max.message.bytes is used by the RPC system
> to
> > > figure
> > > > > > >> out whether to read the full incoming RP
> > > > > > > Whoops, this is incorrect.  I was thinking about
> > > > > > > "socket.request.max.bytes" rather than "max.message.bytes."
> Sorry
> > > > > about
> > > > > > > that.  See Ismael's email as well.
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > >> best,
> > > > > > >> Colin
> > > > > > >>
> > > > > > >>>>> Optimizing for still having functional
> > > > > > >>>>> software under this circumstances is not reasonable.
> > > > > > >>>>> You want to get rid of such a
> > > > > > >>>>> node ASAP and pray that zookeepers ticks get corrupted
> often
> > > enough
> > > > > > >>>>> that it finally drops out of the cluster.
> > > > > > >>>>>
> > > > > > >>>>> There is a good reason that these kinda things
> > > > > > >>>>> https://issues.apache.org/jira/browse/MESOS-4105
> > > > > > >>>>> don't end up as kafka Jiras. In the end you can't run any
> > > software
> > > > > in
> > > > > > >>>>> these containers anymore. Application layer checksums are a
> > > neat
> > > > > thing to
> > > > > > >>>>> fail fast but trying to cope with this probably causes
> more bad
> > > > > than
> > > > > > >>>>> good.  So I would argue that we shouldn't try this for the
> > > fetch
> > > > > requests.
> > > > > > >>>> One of the goals of Apache Kafka is to be "a streaming
> > > platform...
> > > > > > >>>> [that] lets you store streams of records in a fault-tolerant
> > > way."
> > > > > For
> > > > > > >>>> more information, see https://kafka.apache.org/intro .
> > > > > Fault-tolerance
> > > > > > >>>> is explicitly part of the goal of Kafka.  Prayer should be
> > > > > optional, not
> > > > > > >>>> required, when running the software.
> > > > > > >>> Yes, we need to fail ASAP when we read corrupted packages. It
> > > seemed
> > > > > > >>> to me like you tried to make the case for pray and try to
> stay
> > > alive.
> > > > > > >>> Fault
> > > > > > >>> tolerance here means. I am a fishy box i am going to let a
> good
> > > box
> > > > > > >>> handle
> > > > > > >>> it and be silent until i get fixed up.
> > > > > > >>>> Crashing because someone sent you a bad packet is not
> reasonable
> > > > > > >>>> behavior.  It is a bug.  Similarly, bringing down the whole
> > > cluster,
> > > > > > >>>> which could a hundred nodes, because someone had a bad
> network
> > > > > adapter
> > > > > > >>>> is not reasonable behavior.  It is perhaps reasonable for
> the
> > > > > cluster to
> > > > > > >>>> perform worse when hardware is having problems.  But that's
> a
> > > > > different
> > > > > > >>>> discussion.
> > > > > > >>> See above.
> > > > > > >>>> best,
> > > > > > >>>> Colin
> > > > > > >>>>
> > > > > > >>>>>> However, my point was not about TCP's guarantees being
> > > violated.
> > > > > My
> > > > > > >>>>>> point is that TCP's guarantees are only one small building
> > > block
> > > > > to
> > > > > > >>>>>> build a robust distributed system.  TCP basically just
> says
> > > that
> > > > > if you
> > > > > > >>>>>> get any bytes from the stream, you will get the ones that
> were
> > > > > sent by
> > > > > > >>>>>> the sender, in the order they were sent.  TCP does not
> > > guarantee
> > > > > that
> > > > > > >>>>>> the bytes you send will get there.  It does not guarantee
> > > that if
> > > > > you
> > > > > > >>>>>> close the connection, the other end will know about it in
> a
> > > timely
> > > > > > >>>>>> fashion.
> > > > > > >>>>> These are very powerful grantees and since we use TCP we
> should
> > > > > > >>>>> piggy pack everything that is reasonable on to it. IMO
> there
> > > is no
> > > > > > >>>>> need to reimplement correct sequencing again if you get
> that
> > > from
> > > > > > >>>>> your transport layer. It saves you the complexity, it makes
> > > > > > >>>>> you application behave way more naturally and your api
> easier
> > > to
> > > > > > >>>>> understand.
> > > > > > >>>>>
> > > > > > >>>>> There is literally nothing the Kernel wont let you decide
> > > > > > >>>>> especially not any timings. Only noticeable exception being
> > > > > TIME_WAIT
> > > > > > >>>>> of usually 240 seconds but that already has little todo
> with
> > > the
> > > > > broker
> > > > > > >>>>> itself and
> > > > > > >>>>> if we are running out of usable ports because of this then
> > > expiring
> > > > > > >>>>> fetch requests
> > > > > > >>>>> wont help much anyways.
> > > > > > >>>>>
> > > > > > >>>>> I hope I could strengthen the trust you have in userland
> TCP
> > > > > connection
> > > > > > >>>>> management. It is really powerful and can be exploited for
> > > maximum
> > > > > gains
> > > > > > >>>>> without much risk in my opinion.
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>> It does not guarantee that the bytes will be received in a
> > > > > > >>>>>> certain timeframe, and certainly doesn't guarantee that
> if you
> > > > > send a
> > > > > > >>>>>> byte on connection X and then on connection Y, that the
> remote
> > > > > end will
> > > > > > >>>>>> read a byte on X before reading a byte on Y.
> > > > > > >>>>> Noone expects this from two independent paths of any kind.
> > > > > > >>>>>
> > > > > > >>>>>> best,
> > > > > > >>>>>> Colin
> > > > > > >>>>>>
> > > > > > >>>>>>> Hope this made my view clearer, especially the first
> part.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Best Jan
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>>> best,
> > > > > > >>>>>>>> Colin
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> If there is an error such as NotLeaderForPartition is
> > > > > > >>>>>>>>> returned for some partitions, the follower can always
> send
> > > a
> > > > > full
> > > > > > >>>>>>>>> FetchRequest. Is there a scenario that only some of the
> > > > > partitions in a
> > > > > > >>>>>>>>> FetchResponse is lost?
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks,
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Jiangjie (Becket) Qin
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> > > > > cmccabe@apache.org>  wrote:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > > > > > >>>>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<
> > > > > cmccabe@apache.org>
> > > > > > >>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > > > > >>>>>>>>>>>>> Hey Colin,
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thanks much for the update. I have a few questions
> > > below:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 1. I am not very sure that we need Fetch Session
> > > Epoch. It
> > > > > seems that
> > > > > > >>>>>>>>>>>>> Fetch
> > > > > > >>>>>>>>>>>>> Session Epoch is only needed to help leader
> distinguish
> > > > > between "a
> > > > > > >>>>>>>>>> full
> > > > > > >>>>>>>>>>>>> fetch request" and "a full fetch request and
> request a
> > > new
> > > > > > >>>>>>>>>> incremental
> > > > > > >>>>>>>>>>>>> fetch session". Alternatively, follower can also
> > > indicate
> > > > > "a full
> > > > > > >>>>>>>>>> fetch
> > > > > > >>>>>>>>>>>>> request and request a new incremental fetch
> session" by
> > > > > setting Fetch
> > > > > > >>>>>>>>>>>>> Session ID to -1 without using Fetch Session Epoch.
> > > Does
> > > > > this make
> > > > > > >>>>>>>>>> sense?
> > > > > > >>>>>>>>>>>> Hi Dong,
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> The fetch session epoch is very important for
> ensuring
> > > > > correctness.  It
> > > > > > >>>>>>>>>>>> prevents corrupted or incomplete fetch data due to
> > > network
> > > > > reordering
> > > > > > >>>>>>>>>> or
> > > > > > >>>>>>>>>>>> loss.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> For example, consider a scenario where the follower
> > > sends a
> > > > > fetch
> > > > > > >>>>>>>>>>>> request to the leader.  The leader responds, but the
> > > > > response is lost
> > > > > > >>>>>>>>>>>> because of network problems which affected the TCP
> > > > > session.  In that
> > > > > > >>>>>>>>>>>> case, the follower must establish a new TCP session
> and
> > > > > re-send the
> > > > > > >>>>>>>>>>>> incremental fetch request.  But the leader does not
> know
> > > > > that the
> > > > > > >>>>>>>>>>>> follower didn't receive the previous incremental
> fetch
> > > > > response.  It is
> > > > > > >>>>>>>>>>>> only the incremental fetch epoch which lets the
> leader
> > > know
> > > > > that it
> > > > > > >>>>>>>>>>>> needs to resend that data, and not data which comes
> > > > > afterwards.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> You could construct similar scenarios with message
> > > > > reordering,
> > > > > > >>>>>>>>>>>> duplication, etc.  Basically, this is a stateful
> > > protocol
> > > > > on an
> > > > > > >>>>>>>>>>>> unreliable network, and you need to know whether the
> > > > > follower got the
> > > > > > >>>>>>>>>>>> previous data you sent before you move on.  And you
> > > need to
> > > > > handle
> > > > > > >>>>>>>>>>>> issues like duplicated or delayed requests.  These
> > > issues
> > > > > do not affect
> > > > > > >>>>>>>>>>>> the full fetch request, because it is not
> stateful-- any
> > > > > full fetch
> > > > > > >>>>>>>>>>>> request can be understood and properly responded to
> in
> > > > > isolation.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>> Thanks for the explanation. This makes sense. On the
> > > other
> > > > > hand I would
> > > > > > >>>>>>>>>>> be interested in learning more about whether Becket's
> > > > > solution can help
> > > > > > >>>>>>>>>>> simplify the protocol by not having the echo field
> and
> > > > > whether that is
> > > > > > >>>>>>>>>>> worth doing.
> > > > > > >>>>>>>>>> Hi Dong,
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> I commented about this in the other thread.  A
> solution
> > > which
> > > > > doesn't
> > > > > > >>>>>>>>>> maintain session information doesn't work here.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 2. It is said that Incremental FetchRequest will
> > > include
> > > > > partitions
> > > > > > >>>>>>>>>> whose
> > > > > > >>>>>>>>>>>>> fetch offset or maximum number of fetch bytes has
> been
> > > > > changed. If
> > > > > > >>>>>>>>>>>>> follower's logStartOffet of a partition has
> changed,
> > > > > should this
> > > > > > >>>>>>>>>>>>> partition also be included in the next
> FetchRequest to
> > > the
> > > > > leader?
> > > > > > >>>>>>>>>>>> Otherwise, it
> > > > > > >>>>>>>>>>>>> may affect the handling of DeleteRecordsRequest
> because
> > > > > leader may
> > > > > > >>>>>>>>>> not
> > > > > > >>>>>>>>>>>> know
> > > > > > >>>>>>>>>>>>> the corresponding data has been deleted on the
> > > follower.
> > > > > > >>>>>>>>>>>> Yeah, the follower should include the partition if
> the
> > > > > logStartOffset
> > > > > > >>>>>>>>>>>> has changed.  That should be spelled out on the KIP.
> > > Fixed.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 3. In the section "Per-Partition Data", a
> partition is
> > > not
> > > > > considered
> > > > > > >>>>>>>>>>>>> dirty if its log start offset has changed. Later
> in the
> > > > > section
> > > > > > >>>>>>>>>>>> "FetchRequest
> > > > > > >>>>>>>>>>>>> Changes", it is said that incremental fetch
> responses
> > > will
> > > > > include a
> > > > > > >>>>>>>>>>>>> partition if its logStartOffset has changed. It
> seems
> > > > > inconsistent.
> > > > > > >>>>>>>>>> Can
> > > > > > >>>>>>>>>>>>> you update the KIP to clarify it?
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> In the "Per-Partition Data" section, it does say
> that
> > > > > logStartOffset
> > > > > > >>>>>>>>>>>> changes make a partition dirty, though, right?  The
> > > first
> > > > > bullet point
> > > > > > >>>>>>>>>>>> is:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> * The LogCleaner deletes messages, and this
> changes the
> > > > > log start
> > > > > > >>>>>>>>>> offset
> > > > > > >>>>>>>>>>>> of the partition on the leader., or
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>> Ah I see. I think I didn't notice this because
> statement
> > > > > assumes that the
> > > > > > >>>>>>>>>>> LogStartOffset in the leader only changes due to
> > > LogCleaner.
> > > > > In fact the
> > > > > > >>>>>>>>>>> LogStartOffset can change on the leader due to
> either log
> > > > > retention and
> > > > > > >>>>>>>>>>> DeleteRecordsRequest. I haven't verified whether
> > > LogCleaner
> > > > > can change
> > > > > > >>>>>>>>>>> LogStartOffset though. It may be a bit better to
> just say
> > > > > that a
> > > > > > >>>>>>>>>>> partition is considered dirty if LogStartOffset
> changes.
> > > > > > >>>>>>>>>> I agree.  It should be straightforward to just resend
> the
> > > > > partition if
> > > > > > >>>>>>>>>> logStartOffset changes.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 4. In "Fetch Session Caching" section, it is said
> that
> > > > > each broker
> > > > > > >>>>>>>>>> has a
> > > > > > >>>>>>>>>>>>> limited number of slots. How is this number
> determined?
> > > > > Does this
> > > > > > >>>>>>>>>> require
> > > > > > >>>>>>>>>>>>> a new broker config for this number?
> > > > > > >>>>>>>>>>>> Good point.  I added two broker configuration
> > > parameters to
> > > > > control
> > > > > > >>>>>>>>>> this
> > > > > > >>>>>>>>>>>> number.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>> I am curious to see whether we can avoid some of
> these
> > > new
> > > > > configs. For
> > > > > > >>>>>>>>>>> example, incremental.fetch.session.
> > > cache.slots.per.broker
> > > > > is probably
> > > > > > >>>>>>>>>> not
> > > > > > >>>>>>>>>>> necessary because if a leader knows that a
> FetchRequest
> > > > > comes from a
> > > > > > >>>>>>>>>>> follower, we probably want the leader to always
> cache the
> > > > > information
> > > > > > >>>>>>>>>>> from that follower. Does this make sense?
> > > > > > >>>>>>>>>> Yeah, maybe we can avoid having
> > > > > > >>>>>>>>>> incremental.fetch.session.cache.slots.per.broker.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> Maybe we can discuss the config later after there is
> > > > > agreement on how the
> > > > > > >>>>>>>>>>> protocol would look like.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> What is the error code if broker does
> > > > > > >>>>>>>>>>>>> not have new log for the incoming FetchRequest?
> > > > > > >>>>>>>>>>>> Hmm, is there a typo in this question?  Maybe you
> meant
> > > to
> > > > > ask what
> > > > > > >>>>>>>>>>>> happens if there is no new cache slot for the
> incoming
> > > > > FetchRequest?
> > > > > > >>>>>>>>>>>> That's not an error-- the incremental fetch session
> ID
> > > just
> > > > > gets set to
> > > > > > >>>>>>>>>>>> 0, indicating no incremental fetch session was
> created.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>> Yeah there is a typo. You have answered my question.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 5. Can you clarify what happens if follower adds a
> > > > > partition to the
> > > > > > >>>>>>>>>>>>> ReplicaFetcherThread after receiving
> > > LeaderAndIsrRequest?
> > > > > Does leader
> > > > > > >>>>>>>>>>>>> needs to generate a new session for this
> > > > > ReplicaFetcherThread or
> > > > > > >>>>>>>>>> does it
> > > > > > >>>>>>>>>>>> re-use
> > > > > > >>>>>>>>>>>>> the existing session?  If it uses a new session,
> is the
> > > > > old session
> > > > > > >>>>>>>>>>>>> actively deleted from the slot?
> > > > > > >>>>>>>>>>>> The basic idea is that you can't make changes,
> except by
> > > > > sending a full
> > > > > > >>>>>>>>>>>> fetch request.  However, perhaps we can allow the
> > > client to
> > > > > re-use its
> > > > > > >>>>>>>>>>>> existing session ID.  If the client sets sessionId
> = id,
> > > > > epoch = 0, it
> > > > > > >>>>>>>>>>>> could re-initialize the session.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>> Yeah I agree with the basic idea. We probably want to
> > > > > understand more
> > > > > > >>>>>>>>>>> detail about how this works later.
> > > > > > >>>>>>>>>> Sounds good.  I updated the KIP with this
> information.  A
> > > > > > >>>>>>>>>> re-initialization should be exactly the same as an
> > > > > initialization,
> > > > > > >>>>>>>>>> except that it reuses an existing ID.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> best,
> > > > > > >>>>>>>>>> Colin
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>>>> BTW, I think it may be useful if the KIP can
> include
> > > the
> > > > > example
> > > > > > >>>>>>>>>> workflow
> > > > > > >>>>>>>>>>>>> of how this feature will be used in case of
> partition
> > > > > change and so
> > > > > > >>>>>>>>>> on.
> > > > > > >>>>>>>>>>>> Yeah, that might help.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> best,
> > > > > > >>>>>>>>>>>> Colin
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thanks,
> > > > > > >>>>>>>>>>>>> Dong
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<
> > > > > cmccabe@apache.org>
> > > > > > >>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> I updated the KIP with the ideas we've been
> > > discussing.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> best,
> > > > > > >>>>>>>>>>>>>> Colin
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe
> wrote:
> > > > > > >>>>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak
> wrote:
> > > > > > >>>>>>>>>>>>>>>> Hi Colin, thank you  for this KIP, it can
> become a
> > > > > really
> > > > > > >>>>>>>>>> useful
> > > > > > >>>>>>>>>>>> thing.
> > > > > > >>>>>>>>>>>>>>>> I just scanned through the discussion so far and
> > > wanted
> > > > > to
> > > > > > >>>>>>>>>> start a
> > > > > > >>>>>>>>>>>>>>>> thread to make as decision about keeping the
> > > > > > >>>>>>>>>>>>>>>> cache with the Connection / Session or having
> some
> > > sort
> > > > > of UUID
> > > > > > >>>>>>>>>>>> indN
> > > > > > >>>>>>>>>>>>>> exed
> > > > > > >>>>>>>>>>>>>>>> global Map.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Sorry if that has been settled already and I
> missed
> > > it.
> > > > > In this
> > > > > > >>>>>>>>>>>> case
> > > > > > >>>>>>>>>>>>>>>> could anyone point me to the discussion?
> > > > > > >>>>>>>>>>>>>>> Hi Jan,
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> I don't think anyone has discussed the idea of
> tying
> > > the
> > > > > cache
> > > > > > >>>>>>>>>> to an
> > > > > > >>>>>>>>>>>>>>> individual TCP session yet.  I agree that since
> the
> > > > > cache is
> > > > > > >>>>>>>>>>>> intended to
> > > > > > >>>>>>>>>>>>>>> be used only by a single follower or client,
> it's an
> > > > > interesting
> > > > > > >>>>>>>>>>>> thing
> > > > > > >>>>>>>>>>>>>>> to think about.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> I guess the obvious disadvantage is that whenever
> > > your
> > > > > TCP
> > > > > > >>>>>>>>>> session
> > > > > > >>>>>>>>>>>>>>> drops, you have to make a full fetch request
> rather
> > > than
> > > > > an
> > > > > > >>>>>>>>>>>> incremental
> > > > > > >>>>>>>>>>>>>>> one.  It's not clear to me how often this
> happens in
> > > > > practice --
> > > > > > >>>>>>>>>> it
> > > > > > >>>>>>>>>>>>>>> probably depends a lot on the quality of the
> network.
> > > > > From a
> > > > > > >>>>>>>>>> code
> > > > > > >>>>>>>>>>>>>>> perspective, it might also be a bit difficult to
> > > access
> > > > > data
> > > > > > >>>>>>>>>>>> associated
> > > > > > >>>>>>>>>>>>>>> with the Session from classes like KafkaApis
> > > (although
> > > > > we could
> > > > > > >>>>>>>>>>>> refactor
> > > > > > >>>>>>>>>>>>>>> it to make this easier).
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> It's also clear that even if we tie the cache to
> the
> > > > > session, we
> > > > > > >>>>>>>>>>>> still
> > > > > > >>>>>>>>>>>>>>> have to have limits on the number of caches we're
> > > > > willing to
> > > > > > >>>>>>>>>> create.
> > > > > > >>>>>>>>>>>>>>> And probably we should reserve some cache slots
> for
> > > each
> > > > > > >>>>>>>>>> follower, so
> > > > > > >>>>>>>>>>>>>>> that clients don't take all of them.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Id rather see a protocol in which the client is
> > > hinting
> > > > > the
> > > > > > >>>>>>>>>> broker
> > > > > > >>>>>>>>>>>>>> that,
> > > > > > >>>>>>>>>>>>>>>> he is going to use the feature instead of a
> client
> > > > > > >>>>>>>>>>>>>>>> realizing that the broker just offered the
> feature
> > > > > (regardless
> > > > > > >>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>>>> protocol version which should only indicate
> that the
> > > > > feature
> > > > > > >>>>>>>>>>>>>>>> would be usable).
> > > > > > >>>>>>>>>>>>>>> Hmm.  I'm not sure what you mean by "hinting."
> I do
> > > > > think that
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> server should have the option of not accepting
> > > > > incremental
> > > > > > >>>>>>>>>> requests
> > > > > > >>>>>>>>>>>> from
> > > > > > >>>>>>>>>>>>>>> specific clients, in order to save memory space.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> This seems to work better with a per
> > > > > > >>>>>>>>>>>>>>>> connection/session attached Metadata than with
> a Map
> > > > > and could
> > > > > > >>>>>>>>>>>> allow
> > > > > > >>>>>>>>>>>>>> for
> > > > > > >>>>>>>>>>>>>>>> easier client implementations.
> > > > > > >>>>>>>>>>>>>>>> It would also make Client-side code easier as
> there
> > > > > wouldn't
> > > > > > >>>>>>>>>> be any
> > > > > > >>>>>>>>>>>>>>>> Cache-miss error Messages to handle.
> > > > > > >>>>>>>>>>>>>>> It is nice not to have to handle cache-miss
> > > responses, I
> > > > > agree.
> > > > > > >>>>>>>>>>>>>>> However, TCP sessions aren't exposed to most of
> our
> > > > > client-side
> > > > > > >>>>>>>>>> code.
> > > > > > >>>>>>>>>>>>>>> For example, when the Producer creates a message
> and
> > > > > hands it
> > > > > > >>>>>>>>>> off to
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> NetworkClient, the NC will transparently
> re-connect
> > > and
> > > > > re-send a
> > > > > > >>>>>>>>>>>>>>> message if the first send failed.  The
> higher-level
> > > code
> > > > > will
> > > > > > >>>>>>>>>> not be
> > > > > > >>>>>>>>>>>>>>> informed about whether the TCP session was
> > > > > re-established,
> > > > > > >>>>>>>>>> whether an
> > > > > > >>>>>>>>>>>>>>> existing TCP session was used, and so on.  So
> > > overall I
> > > > > would
> > > > > > >>>>>>>>>> still
> > > > > > >>>>>>>>>>>> lean
> > > > > > >>>>>>>>>>>>>>> towards not coupling this to the TCP session...
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> best,
> > > > > > >>>>>>>>>>>>>>> Colin
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>       Thank you again for the KIP. And again, if
> > > this
> > > > > was clarified
> > > > > > >>>>>>>>>>>> already
> > > > > > >>>>>>>>>>>>>>>> please drop me a hint where I could read about
> it.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Best Jan
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > > > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> I created a KIP to improve the scalability and
> > > latency
> > > > > of
> > > > > > >>>>>>>>>>>>>> FetchRequest:
> > > > > > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> > > confluence/display/KAFKA/KIP-
> > > > > > >>>>>>>>>>>>>> 227%3A+Introduce+Incremental+
> > > FetchRequests+to+Increase+
> > > > > > >>>>>>>>>>>>>> Partition+Scalability
> > > > > > >>>>>>>>>>>>>>>>> Please take a look.
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> cheers,
> > > > > > >>>>>>>>>>>>>>>>> Colin
> > > > > >
> > > > >
> > > > >
> > >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message