kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Becket Qin <becket....@gmail.com>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Thu, 04 Jan 2018 01:42:36 GMT
Hi Colin,

I see. I was under the impression that the sessions are immutable and they
are actually not. Then I don't have further concerns over the KIP. We can
incrementally do the future optimization.

One minor thing, the KIP is still using epoch instead of sequence number in
some places. We may want to replace them to avoid confusion.

Thanks,

Jiangjie (Becket) Qin

On Wed, Jan 3, 2018 at 9:37 AM, Colin McCabe <cmccabe@apache.org> wrote:

> On Tue, Jan 2, 2018, at 23:49, Becket Qin wrote:
> > Thanks for the reply, Colin.
> >
> > My concern for the reinitialization is potential churn rather than
> > efficiency. The current KIP proposal uses the time and priority based
> > protection to avoid thrashing, but it is not clear to me if that is
> > sufficient. For example, consider topic creation/deletion. In those
> cases,
> > a lot of the replica fetchers will potentially need to re-establish the
> > session. And there might be many client session got evicted. And thus
> again
> > need to re-establish sessions. This would involve two round trips (due to
> > InvalidFetchSessionException), potential metadata refresh and backoff.
>
> Hi Becket,
>
> When a fetcher is adding or removing a partition, it can re-use its
> existing fetch session.  There is no cache churn, and nobody gets evicted,
> in this case.  The fetcher just has to send a full fetch request to
> establish what it wants the new partition set to be.
>
> best,
> Colin
>
> >
> > Admittedly it is probably not going to be worse than what we have now,
> but
> > such uncertain impact still worries me. Are we going to have the follow
> up
> > optimization discussion before the implementation of this KIP or are we
> > going to do it after? In the past we used to have separate KIPs for a
> > complicated feature but implement them together. Perhaps we can do the
> same
> > here if you want to limit the scope of this KIP.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Tue, Jan 2, 2018 at 6:34 PM, Colin McCabe <cmccabe@apache.org> wrote:
> >
> > > On Tue, Jan 2, 2018, at 04:46, Becket Qin wrote:
> > > > Hi Colin,
> > > >
> > > > Good point about KIP-226. Maybe a separate broker epoch is needed
> > > although
> > > > it is a little awkward to let the consumer set this. So was there a
> > > > solution to the frequent pause and resume scenario? Did I miss
> something?
> > > >
> > > > Thanks,
> > > > Jiangjie (Becket) Qin
> > >
> > > Hi Becket,
> > >
> > > Allowing sessions to be re-initialized (as the current KIP does) makes
> > > frequent pauses and resumes less painful, because the memory associated
> > > with the old session can be reclaimed.  The only cost is sending a full
> > > fetch request once when the pause or resume is activated.
> > >
> > > There are other follow-on optimizations that we might want to do later,
> > > like allowing new partitions to be added to existing fetch sessions
> without
> > > a re-initialization, that could make this even more efficient.  But
> that's
> > > not in the current KIP, in order to avoid expanding the scope too much.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > On Wed, Dec 27, 2017 at 1:40 PM, Colin McCabe <cmccabe@apache.org>
> > > wrote:
> > > >
> > > > > On Sat, Dec 23, 2017, at 09:15, Becket Qin wrote:
> > > > > > Hi Colin,
> > > > > >
> > > > > > Thanks for the explanation. I want to clarify a bit more on my
> > > thoughts.
> > > > > >
> > > > > > I am fine with having a separate discussion as long as the
> follow-up
> > > > > > discussion will be incremental on top of this KIP instead of
> > > override the
> > > > > > protocol in this KIP.
> > > > >
> > > > > Hi Becket,
> > > > >
> > > > > Thanks for the clarification.  I do think that the changes we've
> been
> > > > > discussing would be incremental rather than completely replacing
> what
> > > we've
> > > > > talked about here.  See my responses inline below.
> > > > >
> > > > > >
> > > > > > I completely agree this KIP is useful by itself. That being
> said, we
> > > want
> > > > > > to avoid falling into a "local optimal" solution by just saying
> > > because
> > > > > it
> > > > > > solves the problem in this scope. I think we should also think
> if the
> > > > > > solution aligns with a "global optimal" (systematic optimal)
> > > solution as
> > > > > > well. That is why I brought up other considerations. If they
> turned
> > > out
> > > > > to
> > > > > > be orthogonal and should be addressed separately, that's good.
> But at
> > > > > least
> > > > > > it is worth thinking about the potential connections between
> those
> > > > > things.
> > > > > >
> > > > > > One example of such related consideration is the following two
> > > seemingly
> > > > > > unrelated things:
> > > > > >
> > > > > > 1. I might have missed the discussion, but it seems the concern
> of
> > > the
> > > > > > clients doing frequent pause and resume is still not addressed.
> Since
> > > > > this
> > > > > > is a pretty common use case for applications that want to have
> flow
> > > > > > control, or have prioritized consumption, or get consumption
> > > fairness, we
> > > > > > probably want to see how to handle this case. One of the solution
> > > might
> > > > > be
> > > > > > a long-lived session id spanning the clients' life time.
> > > > > >
> > > > > > 2. KAFKA-6029. The key problem is that the leader wants to know
> if a
> > > > > fetch
> > > > > > request is from a shutting down broker or from a restarted
> broker.
> > > > > >
> > > > > > The connection between those two issues is that both of them
> could be
> > > > > > addressed by having a life-long session id for each client (or
> > > fetcher,
> > > > > to
> > > > > > be more accurate). This may indicate that having a life long
> session
> > > id
> > > > > > might be a "global optimal" solution so it should be considered
> in
> > > this
> > > > > > KIP. Otherwise, a follow up KIP discussion for KAFKA-6029 may
> either
> > > > > > introduce a broker epoch unnecessarily (which will not be used
> by the
> > > > > > consumers at all) or override what we do in this KIP.
> > > > >
> > > > > Remember that a given follower will have more than one fetch
> session
> > > ID.
> > > > > Each fetcher thread will have its own session ID.  And we will
> > > eventually
> > > > > be able to dynamically add or remove fetcher threads using KIP-226.
> > > > > Therefore, we can't use fetch session IDs to uniquely identify a
> given
> > > > > broker incarnation.  Any time we increase the number of fetcher
> > > threads, a
> > > > > new fetch session ID will show up.
> > > > >
> > > > > If we want to know if a fetch request is from a shutting down
> broker or
> > > > > from a restarted broker, the most straightforward and robust way
> would
> > > > > probably be to add an incarnation number for each broker.  ZK can
> track
> > > > > this number.  This also helps with debugging and logging (you can
> tell
> > > > > "aha-- this request came from the second incarnation, not the
> first."
> > > > >
> > > > > > BTW, to clarify, the main purpose of returning the data at the
> index
> > > > > > boundary was to get the same benefit of efficient incremental
> fetch
> > > for
> > > > > > both low vol and high vol partitions, which is directly related
> to
> > > the
> > > > > > primary goal in this KIP. The other things (such as avoiding
> binary
> > > > > search)
> > > > > > are just potential additional gain, and they are also brought up
> to
> > > see
> > > > > if
> > > > > > that could be a "global optimal" solution.
> > > > >
> > > > > I still think these are separate.  The primary goal of the KIP was
> to
> > > make
> > > > > fetch requests where not all partitions are returning data more
> > > efficient.
> > > > > This isn't really related to the goal of trying to make accessing
> > > > > historical data more efficient.  In most cases, the data we're
> > > accessing is
> > > > > very recent data, and index lookups are not an issue.
> > > > >
> > > > > >
> > > > > > Some other replies below.
> > > > > > >In order for improvements to succeed, I think that it's
> important to
> > > > > > clearly define the scope and goals.  One good example of this
> was the
> > > > > > AdminClient KIP.  We deliberately avoiding ?>discussing new
> > > > > administrative
> > > > > > RPCs in that KIP, in order to limit the scope.  This kept the
> > > discussion
> > > > > > focused on the user interfaces and configuration, rather than on
> the
> > > > > > details of possible >new RPCs.  Once the KIP was completed, it
> was
> > > easy
> > > > > for
> > > > > > us to add new RPCs later in separate KIPs.
> > > > > > Hmm, why is AdminClient is related? All the discussion are about
> how
> > > to
> > > > > > make fetch more efficient, right?
> > > > > >
> > > > > > >Finally, it's not clear that the approach you are proposing is
> the
> > > right
> > > > > > way to go.  I think we would need to have a lot more discussion
> > > about it.
> > > > > > One very big disadvantage is that it couples >what we send back
> on
> > > the
> > > > > wire
> > > > > > tightly to what is on the disk.  It's not clear that we want to
> do
> > > that.
> > > > > > What if we want to change how things are stored in the future?
> How
> > > does
> > > > > > this work with >clients' own concept of fetch sizes?  And so on,
> and
> > > so
> > > > > > on.  This needs its own discussion thread.
> > > > > > That might be true. However, the index file by definition is for
> the
> > > > > files
> > > > > > stored on the disk. So if we decide to change the storage layer
> to
> > > > > > something else, it seems natural to use some other suitable ways
> to
> > > get
> > > > > the
> > > > > > offsets efficiently.
> > > > > >
> > > > > > >There are a lot of simpler solutions that might work as well or
> > > better.
> > > > > > For example, each partition could keep an in-memory LRU cache of
> the
> > > most
> > > > > > recently used offset to file position >mappings.  Or we could
> have a
> > > > > thread
> > > > > > periodically touch the latest page or two of memory in the index
> > > file for
> > > > > > each partition, to make sure that it didn't fall out of the
> cache.
> > > In
> > > > > some
> > > > > > offline >discussions, some of these approaches have looked quite
> > > > > > promising.  I've even seen some good performance numbers for
> > > prototypes.
> > > > > > In any case, it's a separate problem which needs its >own KIP, I
> > > think.
> > > > > > Those are indeed separate discussions. I was not intended to
> discuss
> > > them
> > > > > > in this KIP. Sorry about the confusion.
> > > > > >
> > > > > > Thanks and Merry Christmas,
> > > > >
> > > > > Happy new year.  Sorry if some of my responses are delayed (I'm on
> > > > > vacation).
> > > > >
> > > > > cheers,
> > > > > Colin
> > > > >
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > > On Sat, Dec 23, 2017 at 1:16 AM, Colin McCabe <
> cmccabe@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > On Fri, Dec 22, 2017, at 14:31, Becket Qin wrote:
> > > > > > > > >>
> > > > > > > > >> The point I want to make is that avoiding doing binary
> search
> > > on
> > > > > index
> > > > > > > > >> file and avoid reading the log segments during fetch has
> some
> > > > > > > additional
> > > > > > > > >> benefits. So if the solution works for the current KIP, it
> > > might
> > > > > be a
> > > > > > > > >> better choice.
> > > > > > > >
> > > > > > > > >Let's discuss this in a follow-on KIP.
> > > > > > > >
> > > > > > > > If the discussion will potentially change the protocol in the
> > > current
> > > > > > > > proposal. Would it be better to discuss it now instead of in
> a
> > > > > follow-up
> > > > > > > > KIP so we don't have some protocol that immediately requires
> a
> > > > > change.
> > > > > > >
> > > > > > > Hi Becket,
> > > > > > >
> > > > > > > I think that the problem that you are discussing is different
> than
> > > the
> > > > > > > problem this KIP is designed to address.  This KIP is targeted
> at
> > > > > > > eliminating the wastefulness of re-transmitting information
> about
> > > > > > > partitions that haven't changed in every FetchRequest and
> > > > > FetchResponse.
> > > > > > > The problem you are discussing is dealing with situations
> where the
> > > > > index
> > > > > > > file or the data file is not in the page cache, and therefore
> we
> > > take a
> > > > > > > page fault when doing an index lookup.
> > > > > > >
> > > > > > > This KIP is useful and valuable on its own.  For example, if
> you
> > > have
> > > > > > > brokers in a public cloud in different availability zones, you
> may
> > > > > wish to
> > > > > > > minimize the network traffic between them.  Therefore, you
> don't
> > > want
> > > > > every
> > > > > > > FetchRequest between brokers to be a full FetchRequest.  In
> that
> > > case,
> > > > > this
> > > > > > > KIP is very valuable.
> > > > > > >
> > > > > > > In order for improvements to succeed, I think that it's
> important
> > > to
> > > > > > > clearly define the scope and goals.  One good example of this
> was
> > > the
> > > > > > > AdminClient KIP.  We deliberately avoiding discussing new
> > > > > administrative
> > > > > > > RPCs in that KIP, in order to limit the scope.  This kept the
> > > > > discussion
> > > > > > > focused on the user interfaces and configuration, rather than
> on
> > > the
> > > > > > > details of possible new RPCs.  Once the KIP was completed, it
> was
> > > easy
> > > > > for
> > > > > > > us to add new RPCs later in separate KIPs.
> > > > > > >
> > > > > > > While it's clear that there is probably even more we could do
> to
> > > > > optimize
> > > > > > > fetch requests, making them incremental seems like a good first
> > > cut.  I
> > > > > > > deliberately avoided changing the replication protocol in this
> KIP,
> > > > > because
> > > > > > > I think that it's a big enough change as-is.  If we want to
> change
> > > the
> > > > > > > replication protocol in the future, there is nothing preventing
> > > us...
> > > > > and
> > > > > > > this change will be a useful starting point.
> > > > > > >
> > > > > > > Finally, it's not clear that the approach you are proposing is
> the
> > > > > right
> > > > > > > way to go.  I think we would need to have a lot more discussion
> > > about
> > > > > it.
> > > > > > > One very big disadvantage is that it couples what we send back
> on
> > > the
> > > > > wire
> > > > > > > tightly to what is on the disk.  It's not clear that we want
> to do
> > > > > that.
> > > > > > > What if we want to change how things are stored in the
> future?  How
> > > > > does
> > > > > > > this work with clients' own concept of fetch sizes?  And so on,
> > > and so
> > > > > on.
> > > > > > > This needs its own discussion thread.
> > > > > > >
> > > > > > > There are a lot of simpler solutions that might work as well or
> > > better.
> > > > > > > For example, each partition could keep an in-memory LRU cache
> of
> > > the
> > > > > most
> > > > > > > recently used offset to file position mappings.  Or we could
> have a
> > > > > thread
> > > > > > > periodically touch the latest page or two of memory in the
> index
> > > file
> > > > > for
> > > > > > > each partition, to make sure that it didn't fall out of the
> > > cache.  In
> > > > > some
> > > > > > > offline discussions, some of these approaches have looked quite
> > > > > promising.
> > > > > > > I've even seen some good performance numbers for prototypes.
> In
> > > any
> > > > > case,
> > > > > > > it's a separate problem which needs its own KIP, I think.
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Dec 19, 2017 at 9:26 AM, Colin McCabe <
> colin@cmccabe.xyz
> > > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > On Tue, Dec 19, 2017, at 02:16, Jan Filipiak wrote:
> > > > > > > > > > Sorry for coming back at this so late.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On 11.12.2017 07:12, Colin McCabe wrote:
> > > > > > > > > > > On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote:
> > > > > > > > > > >> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote:
> > > > > > > > > > >>> Hi,
> > > > > > > > > > >>>
> > > > > > > > > > >>> sorry for the late reply, busy times :-/
> > > > > > > > > > >>>
> > > > > > > > > > >>> I would ask you one thing maybe. Since the timeout
> > > > > > > > > > >>> argument seems to be settled I have no further
> argument
> > > > > > > > > > >>> form your side except the "i don't want to".
> > > > > > > > > > >>>
> > > > > > > > > > >>> Can you see that connection.max.idle.max is the exact
> > > time
> > > > > > > > > > >>> that expresses "We expect the client to be away for
> this
> > > > > long,
> > > > > > > > > > >>> and come back and continue"?
> > > > > > > > > > >> Hi Jan,
> > > > > > > > > > >>
> > > > > > > > > > >> Sure, connection.max.idle.max is the exact time that
> we
> > > want
> > > > > to
> > > > > > > keep
> > > > > > > > > > >> around a TCP session.  TCP sessions are relatively
> cheap,
> > > so
> > > > > we
> > > > > > > can
> > > > > > > > > > >> afford to keep them around for 10 minutes by default.
> > > > > Incremental
> > > > > > > > > fetch
> > > > > > > > > > >> state is less cheap, so we want to set a shorter
> timeout
> > > for
> > > > > it.
> > > > > > > We
> > > > > > > > > > >> also want new TCP sessions to be able to reuse an
> existing
> > > > > > > incremental
> > > > > > > > > > >> fetch session rather than creating a new one and
> waiting
> > > for
> > > > > the
> > > > > > > old
> > > > > > > > > one
> > > > > > > > > > >> to time out.
> > > > > > > > > > >>
> > > > > > > > > > >>> also clarified some stuff inline
> > > > > > > > > > >>>
> > > > > > > > > > >>> Best Jan
> > > > > > > > > > >>>
> > > > > > > > > > >>>
> > > > > > > > > > >>>
> > > > > > > > > > >>>
> > > > > > > > > > >>> On 05.12.2017 23:14, Colin McCabe wrote:
> > > > > > > > > > >>>> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > > > > > > > >>>>> Hi Colin
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Addressing the topic of how to manage slots from
> the
> > > other
> > > > > > > thread.
> > > > > > > > > > >>>>> With tcp connections all this comes for free
> > > essentially.
> > > > > > > > > > >>>> Hi Jan,
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> I don't think that it's accurate to say that cache
> > > > > management
> > > > > > > > > "comes for
> > > > > > > > > > >>>> free" by coupling the incremental fetch session with
> > > the TCP
> > > > > > > > > session.
> > > > > > > > > > >>>> When a new TCP session is started by a fetch
> request,
> > > you
> > > > > still
> > > > > > > > > have to
> > > > > > > > > > >>>> decide whether to grant that request an incremental
> > > fetch
> > > > > > > session or
> > > > > > > > > > >>>> not.  If your answer is that you always grant the
> > > request, I
> > > > > > > would
> > > > > > > > > argue
> > > > > > > > > > >>>> that you do not have cache management.
> > > > > > > > > > >>> First I would say, the client has a big say in this.
> If
> > > the
> > > > > > > client
> > > > > > > > > > >>> is not going to issue incremental he shouldn't ask
> for a
> > > > > cache
> > > > > > > > > > >>> when the client ask for the cache we still have all
> > > options
> > > > > to
> > > > > > > deny.
> > > > > > > > > > >> To put it simply, we have to have some cache
> management
> > > above
> > > > > and
> > > > > > > > > beyond
> > > > > > > > > > >> just giving out an incremental fetch session to
> anyone who
> > > > > has a
> > > > > > > TCP
> > > > > > > > > > >> session.  Therefore, caching does not become simpler
> if
> > > you
> > > > > > > couple the
> > > > > > > > > > >> fetch session to the TCP session.
> > > > > > > > > > Simply giving out an fetch session for everyone with a
> > > > > connection is
> > > > > > > too
> > > > > > > > > > simple,
> > > > > > > > > > but I think it plays well into the idea of consumers
> > > choosing to
> > > > > use
> > > > > > > the
> > > > > > > > > > feature
> > > > > > > > > > therefore only enabling where it brings maximum gains
> > > > > > > > > > (replicas,MirrorMakers)
> > > > > > > > > > >>
> > > > > > > > > > >>>> I guess you could argue that timeouts are cache
> > > management,
> > > > > but
> > > > > > > I
> > > > > > > > > don't
> > > > > > > > > > >>>> find that argument persuasive.  Anyone could just
> > > create a
> > > > > lot
> > > > > > > of
> > > > > > > > > TCP
> > > > > > > > > > >>>> sessions and use a lot of resources, in that case.
> So
> > > > > there is
> > > > > > > > > > >>>> essentially no limit on memory use.  In any case,
> TCP
> > > > > sessions
> > > > > > > don't
> > > > > > > > > > >>>> help us implement fetch session timeouts.
> > > > > > > > > > >>> We still have all the options denying the request to
> > > keep the
> > > > > > > state.
> > > > > > > > > > >>> What you want seems like a max connections / ip
> > > safeguard.
> > > > > > > > > > >>> I can currently take down a broker with to many
> > > connections
> > > > > > > easily.
> > > > > > > > > > >>>
> > > > > > > > > > >>>
> > > > > > > > > > >>>>> I still would argue we disable it by default and
> make a
> > > > > flag
> > > > > > > in the
> > > > > > > > > > >>>>> broker to ask the leader to maintain the cache
> while
> > > > > > > replicating
> > > > > > > > > and also only
> > > > > > > > > > >>>>> have it optional in consumers (default to off) so
> one
> > > can
> > > > > turn
> > > > > > > it
> > > > > > > > > on
> > > > > > > > > > >>>>> where it really hurts.  MirrorMaker and audit
> consumers
> > > > > > > > > prominently.
> > > > > > > > > > >>>> I agree with Jason's point from earlier in the
> thread.
> > > > > Adding
> > > > > > > extra
> > > > > > > > > > >>>> configuration knobs that aren't really necessary can
> > > harm
> > > > > > > usability.
> > > > > > > > > > >>>> Certainly asking people to manually turn on a
> feature
> > > > > "where it
> > > > > > > > > really
> > > > > > > > > > >>>> hurts" seems to fall in that category, when we could
> > > easily
> > > > > > > enable
> > > > > > > > > it
> > > > > > > > > > >>>> automatically for them.
> > > > > > > > > > >>> This doesn't make much sense to me.
> > > > > > > > > > >> There are no tradeoffs to think about from the
> client's
> > > point
> > > > > of
> > > > > > > view:
> > > > > > > > > > >> it always wants an incremental fetch session.  So
> there
> > > is no
> > > > > > > benefit
> > > > > > > > > to
> > > > > > > > > > >> making the clients configure an extra setting.
> Updating
> > > and
> > > > > > > managing
> > > > > > > > > > >> client configurations is also more difficult than
> managing
> > > > > broker
> > > > > > > > > > >> configurations for most users.
> > > > > > > > > > >>
> > > > > > > > > > >>> You also wanted to implement
> > > > > > > > > > >>> a "turn of in case of bug"-knob. Having the client
> > > indicate
> > > > > if
> > > > > > > the
> > > > > > > > > > >>> feauture will be used seems reasonable to me.,
> > > > > > > > > > >> True.  However, if there is a bug, we could also roll
> > > back the
> > > > > > > client,
> > > > > > > > > > >> so having this configuration knob is not strictly
> > > required.
> > > > > > > > > > >>
> > > > > > > > > > >>>>> Otherwise I left a few remarks in-line, which
> should
> > > help
> > > > > to
> > > > > > > > > understand
> > > > > > > > > > >>>>> my view of the situation better
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Best Jan
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> On 05.12.2017 08:06, Colin McCabe wrote:
> > > > > > > > > > >>>>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > > > > > > > > >>>>>>> On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > > > > > > >>>>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > > > > > > > > >>>>>>>>> Thanks for the explanation, Colin. A few more
> > > > > questions.
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>>> The session epoch is not complex.  It's just a
> > > number
> > > > > > > which
> > > > > > > > > increments
> > > > > > > > > > >>>>>>>>>> on each incremental fetch.  The session epoch
> is
> > > also
> > > > > > > useful
> > > > > > > > > for
> > > > > > > > > > >>>>>>>>>> debugging-- it allows you to match up
> requests and
> > > > > > > responses
> > > > > > > > > when
> > > > > > > > > > >>>>>>>>>> looking at log files.
> > > > > > > > > > >>>>>>>>> Currently each request in Kafka has a
> correlation
> > > id to
> > > > > > > help
> > > > > > > > > match the
> > > > > > > > > > >>>>>>>>> requests and responses. Is epoch doing
> something
> > > > > > > differently?
> > > > > > > > > > >>>>>>>> Hi Becket,
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> The correlation ID is used within a single TCP
> > > session,
> > > > > to
> > > > > > > > > uniquely
> > > > > > > > > > >>>>>>>> associate a request with a response.  The
> > > correlation
> > > > > ID is
> > > > > > > not
> > > > > > > > > unique
> > > > > > > > > > >>>>>>>> (and has no meaning) outside the context of that
> > > single
> > > > > TCP
> > > > > > > > > session.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Keep in mind, NetworkClient is in charge of TCP
> > > > > sessions,
> > > > > > > and
> > > > > > > > > generally
> > > > > > > > > > >>>>>>>> tries to hide that information from the upper
> > > layers of
> > > > > the
> > > > > > > > > code.  So
> > > > > > > > > > >>>>>>>> when you submit a request to NetworkClient, you
> > > don't
> > > > > know
> > > > > > > if
> > > > > > > > > that
> > > > > > > > > > >>>>>>>> request creates a TCP session, or reuses an
> existing
> > > > > one.
> > > > > > > > > > >>>>>>>>>> Unfortunately, this doesn't work.  Imagine the
> > > client
> > > > > > > misses
> > > > > > > > > an
> > > > > > > > > > >>>>>>>>>> increment fetch response about a partition.
> And
> > > then
> > > > > the
> > > > > > > > > partition is
> > > > > > > > > > >>>>>>>>>> never updated after that.  The client has no
> way
> > > to
> > > > > know
> > > > > > > > > about the
> > > > > > > > > > >>>>>>>>>> partition, since it won't be included in any
> > > future
> > > > > > > > > incremental fetch
> > > > > > > > > > >>>>>>>>>> responses.  And there are no offsets to
> compare,
> > > > > since the
> > > > > > > > > partition is
> > > > > > > > > > >>>>>>>>>> simply omitted from the response.
> > > > > > > > > > >>>>>>>>> I am curious about in which situation would the
> > > > > follower
> > > > > > > miss
> > > > > > > > > a response
> > > > > > > > > > >>>>>>>>> of a partition. If the entire FetchResponse is
> lost
> > > > > (e.g.
> > > > > > > > > timeout), the
> > > > > > > > > > >>>>>>>>> follower would disconnect and retry. That will
> > > result
> > > > > in
> > > > > > > > > sending a full
> > > > > > > > > > >>>>>>>>> FetchRequest.
> > > > > > > > > > >>>>>>>> Basically, you are proposing that we rely on
> TCP for
> > > > > > > reliable
> > > > > > > > > delivery
> > > > > > > > > > >>>>>>>> in a distributed system.  That isn't a good idea
> > > for a
> > > > > > > bunch of
> > > > > > > > > > >>>>>>>> different reasons.  First of all, TCP timeouts
> tend
> > > to
> > > > > be
> > > > > > > very
> > > > > > > > > long.  So
> > > > > > > > > > >>>>>>>> if the TCP session timing out is your error
> > > detection
> > > > > > > > > mechanism, you
> > > > > > > > > > >>>>>>>> have to wait minutes for messages to timeout.
> Of
> > > > > course, we
> > > > > > > > > add a
> > > > > > > > > > >>>>>>>> timeout on top of that after which we declare
> the
> > > > > connection
> > > > > > > > > bad and
> > > > > > > > > > >>>>>>>> manually close it.  But just because the
> session is
> > > > > closed
> > > > > > > on
> > > > > > > > > one end
> > > > > > > > > > >>>>>>>> doesn't mean that the other end knows that it is
> > > > > closed.  So
> > > > > > > > > the leader
> > > > > > > > > > >>>>>>>> may have to wait quite a long time before TCP
> > > decides
> > > > > that
> > > > > > > yes,
> > > > > > > > > > >>>>>>>> connection X from the follower is dead and not
> > > coming
> > > > > back,
> > > > > > > > > even though
> > > > > > > > > > >>>>>>>> gremlins ate the FIN packet which the follower
> > > > > attempted to
> > > > > > > > > translate.
> > > > > > > > > > >>>>>>>> If the cache state is tied to that TCP session,
> we
> > > have
> > > > > to
> > > > > > > keep
> > > > > > > > > that
> > > > > > > > > > >>>>>>>> cache around for a much longer time than we
> should.
> > > > > > > > > > >>>>>>> Hi,
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I see this from a different perspective. The
> cache
> > > expiry
> > > > > > > time
> > > > > > > > > > >>>>>>> has the same semantic as idle connection time in
> this
> > > > > > > scenario.
> > > > > > > > > > >>>>>>> It is the time range we expect the client to come
> > > back an
> > > > > > > reuse
> > > > > > > > > > >>>>>>> its broker side state. I would argue that on
> close we
> > > > > would
> > > > > > > get
> > > > > > > > > an
> > > > > > > > > > >>>>>>> extra shot at cleaning up the session state
> early. As
> > > > > > > opposed to
> > > > > > > > > > >>>>>>> always wait for that duration for expiry to
> happen.
> > > > > > > > > > >>>>>> Hi Jan,
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> The idea here is that the incremental fetch cache
> > > expiry
> > > > > time
> > > > > > > can
> > > > > > > > > be
> > > > > > > > > > >>>>>> much shorter than the TCP session timeout.  In
> general
> > > > > the TCP
> > > > > > > > > session
> > > > > > > > > > >>>>>> timeout is common to all TCP connections, and very
> > > long.
> > > > > To
> > > > > > > make
> > > > > > > > > these
> > > > > > > > > > >>>>>> numbers a little more concrete, the TCP session
> > > timeout is
> > > > > > > often
> > > > > > > > > > >>>>>> configured to be 2 hours on Linux.  (See
> > > > > > > > > > >>>>>> https://www.cyberciti.biz/
> tips/linux-increasing-or-
> > > > > > > > > decreasing-tcp-sockets-timeouts.html
> > > > > > > > > > >>>>>> )  The timeout I was proposing for incremental
> fetch
> > > > > sessions
> > > > > > > was
> > > > > > > > > one or
> > > > > > > > > > >>>>>> two minutes at most.
> > > > > > > > > > >>>>> Currently this is taken care of by
> > > > > > > > > > >>>>> connections.max.idle.ms on the broker and
> defaults to
> > > > > > > something
> > > > > > > > > of few
> > > > > > > > > > >>>>> minutes.
> > > > > > > > > > >>>> It is 10 minutes by default, which is longer than
> what
> > > we
> > > > > want
> > > > > > > the
> > > > > > > > > > >>>> incremental fetch session timeout to be.  There's no
> > > reason
> > > > > to
> > > > > > > > > couple
> > > > > > > > > > >>>> these two things.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> Also something we could let the client change if we
> > > really
> > > > > > > wanted
> > > > > > > > > to.
> > > > > > > > > > >>>>> So there is no need to worry about coupling our
> > > > > implementation
> > > > > > > to
> > > > > > > > > some
> > > > > > > > > > >>>>> timeouts given by the OS, with TCP one always has
> full
> > > > > control
> > > > > > > > > over the worst
> > > > > > > > > > >>>>> times + one gets the extra shot cleaning up early
> when
> > > the
> > > > > > > close
> > > > > > > > > comes through.
> > > > > > > > > > >>>>> Which is the majority of the cases.
> > > > > > > > > > >>>> In the majority of cases, the TCP session will be
> > > > > > > re-established.
> > > > > > > > > In
> > > > > > > > > > >>>> that case, we have to send a full fetch request
> rather
> > > than
> > > > > an
> > > > > > > > > > >>>> incremental fetch request.
> > > > > > > > > > >>> I actually have a hard time believing this. Do you
> have
> > > any
> > > > > > > numbers
> > > > > > > > > of
> > > > > > > > > > >>> any existing production system? Is it the
> virtualisation
> > > > > layer
> > > > > > > > > cutting
> > > > > > > > > > >>> all the connections?
> > > > > > > > > > >>> We see this only on application crashes and restarts
> > > where
> > > > > the
> > > > > > > app
> > > > > > > > > needs
> > > > > > > > > > >>> todo the full anyways
> > > > > > > > > > >>> as it probably continues with stores offsets.
> > > > > > > > > > >> Yes, TCP connections get dropped.  It happens very
> often
> > > in
> > > > > > > production
> > > > > > > > > > >> clusters, actually.  When I was working on Hadoop,
> one of
> > > the
> > > > > most
> > > > > > > > > > >> common questions I heard from newcomers was "why do I
> see
> > > so
> > > > > many
> > > > > > > > > > >> EOFException messages in the logs"?  The other thing
> that
> > > > > happens
> > > > > > > a
> > > > > > > > > lot
> > > > > > > > > > >> is DNS outages or slowness.  Public clouds seem to
> have
> > > even
> > > > > more
> > > > > > > > > > >> unstable networks than the on-premise clusters.  I am
> not
> > > > > sure why
> > > > > > > > > that
> > > > > > > > > > >> is.
> > > > > > > > > > Hadoop has a wiki page on exactly this
> > > > > > > > > > https://wiki.apache.org/hadoop/EOFException
> > > > > > > > > >
> > > > > > > > > > besides user errors they have servers crashing and
> actually
> > > loss
> > > > > of
> > > > > > > > > > connection high on their list.
> > > > > > > > > > In the case of "server goes away" the cache goes with
> it. So
> > > > > nothing
> > > > > > > to
> > > > > > > > > > argue about the cache beeing reused by
> > > > > > > > > > a new connection.
> > > > > > > > > >
> > > > > > > > > > Can you make an argument at which point the epoch would
> be
> > > > > updated
> > > > > > > > > > broker side to maximise re-usage of the cache on
> > > > > > > > > > lost connections. In many cases the epoch would go out of
> > > sync
> > > > > and we
> > > > > > > > > > would need a full fetch anyways. Am I mistaken here?
> > > > > > > > >
> > > > > > > > > The current proposal is that the server can accept multiple
> > > > > requests
> > > > > > > in a
> > > > > > > > > row with the same sequence number.
> > > > > > > > >
> > > > > > > > > Colin
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >>
> > > > > > > > > > >>>>>>>> Secondly, from a software engineering
> perspective,
> > > it's
> > > > > not
> > > > > > > a
> > > > > > > > > good idea
> > > > > > > > > > >>>>>>>> to try to tightly tie together TCP and our
> code.  We
> > > > > would
> > > > > > > have
> > > > > > > > > to
> > > > > > > > > > >>>>>>>> rework how we interact with NetworkClient so
> that
> > > we are
> > > > > > > aware
> > > > > > > > > of things
> > > > > > > > > > >>>>>>>> like TCP sessions closing or opening.  We would
> > > have to
> > > > > be
> > > > > > > > > careful
> > > > > > > > > > >>>>>>>> preserve the ordering of incoming messages when
> > > doing
> > > > > things
> > > > > > > > > like
> > > > > > > > > > >>>>>>>> putting incoming requests on to a queue to be
> > > processed
> > > > > by
> > > > > > > > > multiple
> > > > > > > > > > >>>>>>>> threads.  It's just a lot of complexity to add,
> and
> > > > > there's
> > > > > > > no
> > > > > > > > > upside.
> > > > > > > > > > >>>>>>> I see the point here. And I had a small chat with
> > > Dong
> > > > > Lin
> > > > > > > > > already
> > > > > > > > > > >>>>>>> making me aware of this. I tried out the
> approaches
> > > and
> > > > > > > propose
> > > > > > > > > the
> > > > > > > > > > >>>>>>> following:
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> The client start and does a full fetch. It then
> does
> > > > > > > incremental
> > > > > > > > > fetches.
> > > > > > > > > > >>>>>>> The connection to the broker dies and is
> > > re-established
> > > > > by
> > > > > > > > > NetworkClient
> > > > > > > > > > >>>>>>> under the hood.
> > > > > > > > > > >>>>>>> The broker sees an incremental fetch without
> having
> > > > > state =>
> > > > > > > > > returns
> > > > > > > > > > >>>>>>> error:
> > > > > > > > > > >>>>>>> Client sees the error, does a full fetch and goes
> > > back to
> > > > > > > > > incrementally
> > > > > > > > > > >>>>>>> fetching.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> having this 1 additional error round trip is
> > > essentially
> > > > > the
> > > > > > > > > same as
> > > > > > > > > > >>>>>>> when something
> > > > > > > > > > >>>>>>> with the sessions or epoch changed unexpectedly
> to
> > > the
> > > > > client
> > > > > > > > > (say
> > > > > > > > > > >>>>>>> expiry).
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> So its nothing extra added but the conditions are
> > > easier
> > > > > to
> > > > > > > > > evaluate.
> > > > > > > > > > >>>>>>> Especially since we do everything with
> NetworkClient.
> > > > > Other
> > > > > > > > > implementers
> > > > > > > > > > >>>>>>> on the
> > > > > > > > > > >>>>>>> protocol are free to optimizes this and do not
> do the
> > > > > > > errornours
> > > > > > > > > > >>>>>>> roundtrip on the
> > > > > > > > > > >>>>>>> new connection.
> > > > > > > > > > >>>>>>> Its a great plus that the client can know when
> the
> > > error
> > > > > is
> > > > > > > gonna
> > > > > > > > > > >>>>>>> happen. instead of
> > > > > > > > > > >>>>>>> the server to always have to report back if
> something
> > > > > changes
> > > > > > > > > > >>>>>>> unexpectedly for the client
> > > > > > > > > > >>>>>> You are assuming that the leader and the follower
> > > agree
> > > > > that
> > > > > > > the
> > > > > > > > > TCP
> > > > > > > > > > >>>>>> session drops at the same time.  When there are
> > > network
> > > > > > > problems,
> > > > > > > > > this
> > > > > > > > > > >>>>>> may not be true.  The leader may still think the
> > > previous
> > > > > TCP
> > > > > > > > > session is
> > > > > > > > > > >>>>>> active.  In that case, we have to keep the
> incremental
> > > > > fetch
> > > > > > > > > session
> > > > > > > > > > >>>>>> state around until we learn otherwise (which
> could be
> > > up
> > > > > to
> > > > > > > that
> > > > > > > > > 2 hour
> > > > > > > > > > >>>>>> timeout I mentioned).  And if we get a new
> incoming
> > > > > > > incremental
> > > > > > > > > fetch
> > > > > > > > > > >>>>>> request, we can't assume that it replaces the
> previous
> > > > > one,
> > > > > > > > > because the
> > > > > > > > > > >>>>>> IDs will be different (the new one starts a new
> > > session).
> > > > > > > > > > >>>>> As mentioned, no reason to fear some time-outs out
> of
> > > our
> > > > > > > control
> > > > > > > > > > >>>>>>>> Imagine that I made an argument that client IDs
> are
> > > > > > > "complex"
> > > > > > > > > and should
> > > > > > > > > > >>>>>>>> be removed from our APIs.  After all, we can
> just
> > > look
> > > > > at
> > > > > > > the
> > > > > > > > > remote IP
> > > > > > > > > > >>>>>>>> address and TCP port of each connection.  Would
> you
> > > > > think
> > > > > > > that
> > > > > > > > > was a
> > > > > > > > > > >>>>>>>> good idea?  The client ID is useful when
> looking at
> > > > > logs.
> > > > > > > For
> > > > > > > > > example,
> > > > > > > > > > >>>>>>>> if a rebalance is having problems, you want to
> know
> > > what
> > > > > > > > > clients were
> > > > > > > > > > >>>>>>>> having a problem.  So having the client ID
> field to
> > > > > guide
> > > > > > > you is
> > > > > > > > > > >>>>>>>> actually much less "complex" in practice than
> not
> > > > > having an
> > > > > > > ID.
> > > > > > > > > > >>>>>>> I still cant follow why the correlation idea
> will not
> > > > > help
> > > > > > > here.
> > > > > > > > > > >>>>>>> Correlating logs with it usually works great.
> Even
> > > with
> > > > > > > > > primitive tools
> > > > > > > > > > >>>>>>> like grep
> > > > > > > > > > >>>>>> The correlation ID does help somewhat, but
> certainly
> > > not
> > > > > as
> > > > > > > much
> > > > > > > > > as a
> > > > > > > > > > >>>>>> unique 64-bit ID.  The correlation ID is not
> unique
> > > in the
> > > > > > > > > broker, just
> > > > > > > > > > >>>>>> unique to a single NetworkClient.  Simiarly, the
> > > > > correlation
> > > > > > > ID
> > > > > > > > > is not
> > > > > > > > > > >>>>>> unique on the client side, if there are multiple
> > > > > Consumers,
> > > > > > > etc.
> > > > > > > > > > >>>>> Can always bump entropy in correlation IDs, never
> had a
> > > > > problem
> > > > > > > > > > >>>>> of finding to many duplicates. Would be a
> different KIP
> > > > > though.
> > > > > > > > > > >>>>>>>> Similarly, if metadata responses had epoch
> numbers
> > > > > (simple
> > > > > > > > > incrementing
> > > > > > > > > > >>>>>>>> numbers), we would not have to debug problems
> like
> > > > > clients
> > > > > > > > > accidentally
> > > > > > > > > > >>>>>>>> getting old metadata from servers that had been
> > > > > partitioned
> > > > > > > off
> > > > > > > > > from the
> > > > > > > > > > >>>>>>>> network for a while.  Clients would know the
> > > difference
> > > > > > > between
> > > > > > > > > old and
> > > > > > > > > > >>>>>>>> new metadata.  So putting epochs in to the
> metadata
> > > > > request
> > > > > > > is
> > > > > > > > > much less
> > > > > > > > > > >>>>>>>> "complex" operationally, even though it's an
> extra
> > > > > field in
> > > > > > > the
> > > > > > > > > request.
> > > > > > > > > > >>>>>>>>      This has been discussed before on the
> mailing
> > > list.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> So I think the bottom line for me is that
> having the
> > > > > > > session ID
> > > > > > > > > and
> > > > > > > > > > >>>>>>>> session epoch, while it adds two extra fields,
> > > reduces
> > > > > > > > > operational
> > > > > > > > > > >>>>>>>> complexity and increases debuggability.  It
> avoids
> > > > > tightly
> > > > > > > > > coupling us
> > > > > > > > > > >>>>>>>> to assumptions about reliable ordered delivery
> which
> > > > > tend
> > > > > > > to be
> > > > > > > > > violated
> > > > > > > > > > >>>>>>>> in practice in multiple layers of the stack.
> > > Finally,
> > > > > it
> > > > > > > > > avoids the
> > > > > > > > > > >>>>>>>> necessity of refactoring NetworkClient.
> > > > > > > > > > >>>>>>> So there is stacks out there that violate TCP
> > > > > guarantees? And
> > > > > > > > > software
> > > > > > > > > > >>>>>>> still works? How can this be? Can you elaborate a
> > > little
> > > > > > > where
> > > > > > > > > this
> > > > > > > > > > >>>>>>> can be violated? I am not very familiar with
> > > virtualized
> > > > > > > > > environments
> > > > > > > > > > >>>>>>> but they can't really violate TCP contracts.
> > > > > > > > > > >>>>>> TCP's guarantees of reliable, in-order
> transmission
> > > > > certainly
> > > > > > > can
> > > > > > > > > be
> > > > > > > > > > >>>>>> violated.  For example, I once had to debug a
> cluster
> > > > > where a
> > > > > > > > > certain
> > > > > > > > > > >>>>>> node had a network card which corrupted its
> > > transmissions
> > > > > > > > > occasionally.
> > > > > > > > > > >>>>>> With all the layers of checksums, you would think
> that
> > > > > this
> > > > > > > was
> > > > > > > > > not
> > > > > > > > > > >>>>>> possible, but it happened.  We occasionally got
> > > corrupted
> > > > > data
> > > > > > > > > written
> > > > > > > > > > >>>>>> to disk on the other end because of it.  Even more
> > > > > > > frustrating,
> > > > > > > > > the data
> > > > > > > > > > >>>>>> was not corrupted on disk on the sending node-- it
> > > was a
> > > > > bug
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > >>>>>> network card driver that was injecting the errors.
> > > > > > > > > > >>>>> true, but your broker might aswell read a corrupted
> > > 600GB
> > > > > as
> > > > > > > size
> > > > > > > > > from
> > > > > > > > > > >>>>> the network and die with OOM instantly.
> > > > > > > > > > >>>> If you read 600 GB as the size from the network, you
> > > will
> > > > > not
> > > > > > > "die
> > > > > > > > > with
> > > > > > > > > > >>>> OOM instantly."  That would be a bug.  Instead, you
> will
> > > > > notice
> > > > > > > > > that 600
> > > > > > > > > > >>>> GB is greater than max.message.bytes, and close the
> > > > > connection.
> > > > > > > > > > >>> We only check max.message.bytes to late to guard
> against
> > > > > consumer
> > > > > > > > > > >>> stalling.
> > > > > > > > > > >>> we dont have a notion of max.networkpacket.size
> before we
> > > > > > > allocate
> > > > > > > > > the
> > > > > > > > > > >>> bytebuffer to read it into.
> > > > > > > > > > >> "network packets" are not the same thing as "kafka
> > > RPCs."  One
> > > > > > > Kafka
> > > > > > > > > RPC
> > > > > > > > > > >> could take up mutiple ethernet packets.
> > > > > > > > > > >>
> > > > > > > > > > >> Also, max.message.bytes has nothing to do with
> "consumer
> > > > > > > stalling" --
> > > > > > > > > > >> you are probably thinking about some of the fetch
> request
> > > > > > > > > > >> configurations.  max.message.bytes is used by the RPC
> > > system
> > > > > to
> > > > > > > figure
> > > > > > > > > > >> out whether to read the full incoming RP
> > > > > > > > > > > Whoops, this is incorrect.  I was thinking about
> > > > > > > > > > > "socket.request.max.bytes" rather than
> "max.message.bytes."
> > > > > Sorry
> > > > > > > > > about
> > > > > > > > > > > that.  See Ismael's email as well.
> > > > > > > > > > >
> > > > > > > > > > > best,
> > > > > > > > > > > Colin
> > > > > > > > > > >
> > > > > > > > > > >> best,
> > > > > > > > > > >> Colin
> > > > > > > > > > >>
> > > > > > > > > > >>>>> Optimizing for still having functional
> > > > > > > > > > >>>>> software under this circumstances is not
> reasonable.
> > > > > > > > > > >>>>> You want to get rid of such a
> > > > > > > > > > >>>>> node ASAP and pray that zookeepers ticks get
> corrupted
> > > > > often
> > > > > > > enough
> > > > > > > > > > >>>>> that it finally drops out of the cluster.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> There is a good reason that these kinda things
> > > > > > > > > > >>>>> https://issues.apache.org/jira/browse/MESOS-4105
> > > > > > > > > > >>>>> don't end up as kafka Jiras. In the end you can't
> run
> > > any
> > > > > > > software
> > > > > > > > > in
> > > > > > > > > > >>>>> these containers anymore. Application layer
> checksums
> > > are a
> > > > > > > neat
> > > > > > > > > thing to
> > > > > > > > > > >>>>> fail fast but trying to cope with this probably
> causes
> > > > > more bad
> > > > > > > > > than
> > > > > > > > > > >>>>> good.  So I would argue that we shouldn't try this
> for
> > > the
> > > > > > > fetch
> > > > > > > > > requests.
> > > > > > > > > > >>>> One of the goals of Apache Kafka is to be "a
> streaming
> > > > > > > platform...
> > > > > > > > > > >>>> [that] lets you store streams of records in a
> > > fault-tolerant
> > > > > > > way."
> > > > > > > > > For
> > > > > > > > > > >>>> more information, see
> https://kafka.apache.org/intro .
> > > > > > > > > Fault-tolerance
> > > > > > > > > > >>>> is explicitly part of the goal of Kafka.  Prayer
> should
> > > be
> > > > > > > > > optional, not
> > > > > > > > > > >>>> required, when running the software.
> > > > > > > > > > >>> Yes, we need to fail ASAP when we read corrupted
> > > packages. It
> > > > > > > seemed
> > > > > > > > > > >>> to me like you tried to make the case for pray and
> try to
> > > > > stay
> > > > > > > alive.
> > > > > > > > > > >>> Fault
> > > > > > > > > > >>> tolerance here means. I am a fishy box i am going to
> let
> > > a
> > > > > good
> > > > > > > box
> > > > > > > > > > >>> handle
> > > > > > > > > > >>> it and be silent until i get fixed up.
> > > > > > > > > > >>>> Crashing because someone sent you a bad packet is
> not
> > > > > reasonable
> > > > > > > > > > >>>> behavior.  It is a bug.  Similarly, bringing down
> the
> > > whole
> > > > > > > cluster,
> > > > > > > > > > >>>> which could a hundred nodes, because someone had a
> bad
> > > > > network
> > > > > > > > > adapter
> > > > > > > > > > >>>> is not reasonable behavior.  It is perhaps
> reasonable
> > > for
> > > > > the
> > > > > > > > > cluster to
> > > > > > > > > > >>>> perform worse when hardware is having problems.  But
> > > that's
> > > > > a
> > > > > > > > > different
> > > > > > > > > > >>>> discussion.
> > > > > > > > > > >>> See above.
> > > > > > > > > > >>>> best,
> > > > > > > > > > >>>> Colin
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>>> However, my point was not about TCP's guarantees
> being
> > > > > > > violated.
> > > > > > > > > My
> > > > > > > > > > >>>>>> point is that TCP's guarantees are only one small
> > > building
> > > > > > > block
> > > > > > > > > to
> > > > > > > > > > >>>>>> build a robust distributed system.  TCP basically
> just
> > > > > says
> > > > > > > that
> > > > > > > > > if you
> > > > > > > > > > >>>>>> get any bytes from the stream, you will get the
> ones
> > > that
> > > > > were
> > > > > > > > > sent by
> > > > > > > > > > >>>>>> the sender, in the order they were sent.  TCP
> does not
> > > > > > > guarantee
> > > > > > > > > that
> > > > > > > > > > >>>>>> the bytes you send will get there.  It does not
> > > guarantee
> > > > > > > that if
> > > > > > > > > you
> > > > > > > > > > >>>>>> close the connection, the other end will know
> about
> > > it in
> > > > > a
> > > > > > > timely
> > > > > > > > > > >>>>>> fashion.
> > > > > > > > > > >>>>> These are very powerful grantees and since we use
> TCP
> > > we
> > > > > should
> > > > > > > > > > >>>>> piggy pack everything that is reasonable on to it.
> IMO
> > > > > there
> > > > > > > is no
> > > > > > > > > > >>>>> need to reimplement correct sequencing again if
> you get
> > > > > that
> > > > > > > from
> > > > > > > > > > >>>>> your transport layer. It saves you the complexity,
> it
> > > makes
> > > > > > > > > > >>>>> you application behave way more naturally and your
> api
> > > > > easier
> > > > > > > to
> > > > > > > > > > >>>>> understand.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> There is literally nothing the Kernel wont let you
> > > decide
> > > > > > > > > > >>>>> especially not any timings. Only noticeable
> exception
> > > being
> > > > > > > > > TIME_WAIT
> > > > > > > > > > >>>>> of usually 240 seconds but that already has little
> todo
> > > > > with
> > > > > > > the
> > > > > > > > > broker
> > > > > > > > > > >>>>> itself and
> > > > > > > > > > >>>>> if we are running out of usable ports because of
> this
> > > then
> > > > > > > expiring
> > > > > > > > > > >>>>> fetch requests
> > > > > > > > > > >>>>> wont help much anyways.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I hope I could strengthen the trust you have in
> > > userland
> > > > > TCP
> > > > > > > > > connection
> > > > > > > > > > >>>>> management. It is really powerful and can be
> exploited
> > > for
> > > > > > > maximum
> > > > > > > > > gains
> > > > > > > > > > >>>>> without much risk in my opinion.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>> It does not guarantee that the bytes will be
> received
> > > in a
> > > > > > > > > > >>>>>> certain timeframe, and certainly doesn't guarantee
> > > that
> > > > > if you
> > > > > > > > > send a
> > > > > > > > > > >>>>>> byte on connection X and then on connection Y,
> that
> > > the
> > > > > remote
> > > > > > > > > end will
> > > > > > > > > > >>>>>> read a byte on X before reading a byte on Y.
> > > > > > > > > > >>>>> Noone expects this from two independent paths of
> any
> > > kind.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>> best,
> > > > > > > > > > >>>>>> Colin
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> Hope this made my view clearer, especially the
> first
> > > > > part.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Best Jan
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>> best,
> > > > > > > > > > >>>>>>>> Colin
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>> If there is an error such as
> NotLeaderForPartition
> > > is
> > > > > > > > > > >>>>>>>>> returned for some partitions, the follower can
> > > always
> > > > > send
> > > > > > > a
> > > > > > > > > full
> > > > > > > > > > >>>>>>>>> FetchRequest. Is there a scenario that only
> some
> > > of the
> > > > > > > > > partitions in a
> > > > > > > > > > >>>>>>>>> FetchResponse is lost?
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>> Thanks,
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>> Jiangjie (Becket) Qin
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> > > > > > > > > cmccabe@apache.org>  wrote:
> > > > > > > > > > >>>>>>>>>
> > > > > > > > > > >>>>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > > > > > > > > > >>>>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin
> McCabe<
> > > > > > > > > cmccabe@apache.org>
> > > > > > > > > > >>>>>>>>>> wrote:
> > > > > > > > > > >>>>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin
> wrote:
> > > > > > > > > > >>>>>>>>>>>>> Hey Colin,
> > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> Thanks much for the update. I have a few
> > > questions
> > > > > > > below:
> > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> 1. I am not very sure that we need Fetch
> > > Session
> > > > > > > Epoch. It
> > > > > > > > > seems that
> > > > > > > > > > >>>>>>>>>>>>> Fetch
> > > > > > > > > > >>>>>>>>>>>>> Session Epoch is only needed to help leader
> > > > > distinguish
> > > > > > > > > between "a
> > > > > > > > > > >>>>>>>>>> full
> > > > > > > > > > >>>>>>>>>>>>> fetch request" and "a full fetch request
> and
> > > > > request a
> > > > > > > new
> > > > > > > > > > >>>>>>>>>> incremental
> > > > > > > > > > >>>>>>>>>>>>> fetch session". Alternatively, follower can
> > > also
> > > > > > > indicate
> > > > > > > > > "a full
> > > > > > > > > > >>>>>>>>>> fetch
> > > > > > > > > > >>>>>>>>>>>>> request and request a new incremental fetch
> > > > > session" by
> > > > > > > > > setting Fetch
> > > > > > > > > > >>>>>>>>>>>>> Session ID to -1 without using Fetch
> Session
> > > Epoch.
> > > > > > > Does
> > > > > > > > > this make
> > > > > > > > > > >>>>>>>>>> sense?
> > > > > > > > > > >>>>>>>>>>>> Hi Dong,
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>> The fetch session epoch is very important
> for
> > > > > ensuring
> > > > > > > > > correctness.  It
> > > > > > > > > > >>>>>>>>>>>> prevents corrupted or incomplete fetch data
> due
> > > to
> > > > > > > network
> > > > > > > > > reordering
> > > > > > > > > > >>>>>>>>>> or
> > > > > > > > > > >>>>>>>>>>>> loss.
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>> For example, consider a scenario where the
> > > follower
> > > > > > > sends a
> > > > > > > > > fetch
> > > > > > > > > > >>>>>>>>>>>> request to the leader.  The leader responds,
> > > but the
> > > > > > > > > response is lost
> > > > > > > > > > >>>>>>>>>>>> because of network problems which affected
> the
> > > TCP
> > > > > > > > > session.  In that
> > > > > > > > > > >>>>>>>>>>>> case, the follower must establish a new TCP
> > > session
> > > > > and
> > > > > > > > > re-send the
> > > > > > > > > > >>>>>>>>>>>> incremental fetch request.  But the leader
> does
> > > not
> > > > > know
> > > > > > > > > that the
> > > > > > > > > > >>>>>>>>>>>> follower didn't receive the previous
> incremental
> > > > > fetch
> > > > > > > > > response.  It is
> > > > > > > > > > >>>>>>>>>>>> only the incremental fetch epoch which lets
> the
> > > > > leader
> > > > > > > know
> > > > > > > > > that it
> > > > > > > > > > >>>>>>>>>>>> needs to resend that data, and not data
> which
> > > comes
> > > > > > > > > afterwards.
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>> You could construct similar scenarios with
> > > message
> > > > > > > > > reordering,
> > > > > > > > > > >>>>>>>>>>>> duplication, etc.  Basically, this is a
> stateful
> > > > > > > protocol
> > > > > > > > > on an
> > > > > > > > > > >>>>>>>>>>>> unreliable network, and you need to know
> > > whether the
> > > > > > > > > follower got the
> > > > > > > > > > >>>>>>>>>>>> previous data you sent before you move on.
> And
> > > you
> > > > > > > need to
> > > > > > > > > handle
> > > > > > > > > > >>>>>>>>>>>> issues like duplicated or delayed requests.
> > > These
> > > > > > > issues
> > > > > > > > > do not affect
> > > > > > > > > > >>>>>>>>>>>> the full fetch request, because it is not
> > > > > stateful-- any
> > > > > > > > > full fetch
> > > > > > > > > > >>>>>>>>>>>> request can be understood and properly
> > > responded to
> > > > > in
> > > > > > > > > isolation.
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>> Thanks for the explanation. This makes
> sense. On
> > > the
> > > > > > > other
> > > > > > > > > hand I would
> > > > > > > > > > >>>>>>>>>>> be interested in learning more about whether
> > > Becket's
> > > > > > > > > solution can help
> > > > > > > > > > >>>>>>>>>>> simplify the protocol by not having the echo
> > > field
> > > > > and
> > > > > > > > > whether that is
> > > > > > > > > > >>>>>>>>>>> worth doing.
> > > > > > > > > > >>>>>>>>>> Hi Dong,
> > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > >>>>>>>>>> I commented about this in the other thread.  A
> > > > > solution
> > > > > > > which
> > > > > > > > > doesn't
> > > > > > > > > > >>>>>>>>>> maintain session information doesn't work
> here.
> > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> 2. It is said that Incremental FetchRequest
> > > will
> > > > > > > include
> > > > > > > > > partitions
> > > > > > > > > > >>>>>>>>>> whose
> > > > > > > > > > >>>>>>>>>>>>> fetch offset or maximum number of fetch
> bytes
> > > has
> > > > > been
> > > > > > > > > changed. If
> > > > > > > > > > >>>>>>>>>>>>> follower's logStartOffet of a partition has
> > > > > changed,
> > > > > > > > > should this
> > > > > > > > > > >>>>>>>>>>>>> partition also be included in the next
> > > > > FetchRequest to
> > > > > > > the
> > > > > > > > > leader?
> > > > > > > > > > >>>>>>>>>>>> Otherwise, it
> > > > > > > > > > >>>>>>>>>>>>> may affect the handling of
> DeleteRecordsRequest
> > > > > because
> > > > > > > > > leader may
> > > > > > > > > > >>>>>>>>>> not
> > > > > > > > > > >>>>>>>>>>>> know
> > > > > > > > > > >>>>>>>>>>>>> the corresponding data has been deleted on
> the
> > > > > > > follower.
> > > > > > > > > > >>>>>>>>>>>> Yeah, the follower should include the
> partition
> > > if
> > > > > the
> > > > > > > > > logStartOffset
> > > > > > > > > > >>>>>>>>>>>> has changed.  That should be spelled out on
> the
> > > KIP.
> > > > > > > Fixed.
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> 3. In the section "Per-Partition Data", a
> > > > > partition is
> > > > > > > not
> > > > > > > > > considered
> > > > > > > > > > >>>>>>>>>>>>> dirty if its log start offset has changed.
> > > Later
> > > > > in the
> > > > > > > > > section
> > > > > > > > > > >>>>>>>>>>>> "FetchRequest
> > > > > > > > > > >>>>>>>>>>>>> Changes", it is said that incremental fetch
> > > > > responses
> > > > > > > will
> > > > > > > > > include a
> > > > > > > > > > >>>>>>>>>>>>> partition if its logStartOffset has
> changed. It
> > > > > seems
> > > > > > > > > inconsistent.
> > > > > > > > > > >>>>>>>>>> Can
> > > > > > > > > > >>>>>>>>>>>>> you update the KIP to clarify it?
> > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>> In the "Per-Partition Data" section, it
> does say
> > > > > that
> > > > > > > > > logStartOffset
> > > > > > > > > > >>>>>>>>>>>> changes make a partition dirty, though,
> right?
> > > The
> > > > > > > first
> > > > > > > > > bullet point
> > > > > > > > > > >>>>>>>>>>>> is:
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> * The LogCleaner deletes messages, and this
> > > > > changes the
> > > > > > > > > log start
> > > > > > > > > > >>>>>>>>>> offset
> > > > > > > > > > >>>>>>>>>>>> of the partition on the leader., or
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>> Ah I see. I think I didn't notice this
> because
> > > > > statement
> > > > > > > > > assumes that the
> > > > > > > > > > >>>>>>>>>>> LogStartOffset in the leader only changes
> due to
> > > > > > > LogCleaner.
> > > > > > > > > In fact the
> > > > > > > > > > >>>>>>>>>>> LogStartOffset can change on the leader due
> to
> > > > > either log
> > > > > > > > > retention and
> > > > > > > > > > >>>>>>>>>>> DeleteRecordsRequest. I haven't verified
> whether
> > > > > > > LogCleaner
> > > > > > > > > can change
> > > > > > > > > > >>>>>>>>>>> LogStartOffset though. It may be a bit
> better to
> > > > > just say
> > > > > > > > > that a
> > > > > > > > > > >>>>>>>>>>> partition is considered dirty if
> LogStartOffset
> > > > > changes.
> > > > > > > > > > >>>>>>>>>> I agree.  It should be straightforward to just
> > > resend
> > > > > the
> > > > > > > > > partition if
> > > > > > > > > > >>>>>>>>>> logStartOffset changes.
> > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> 4. In "Fetch Session Caching" section, it
> is
> > > said
> > > > > that
> > > > > > > > > each broker
> > > > > > > > > > >>>>>>>>>> has a
> > > > > > > > > > >>>>>>>>>>>>> limited number of slots. How is this number
> > > > > determined?
> > > > > > > > > Does this
> > > > > > > > > > >>>>>>>>>> require
> > > > > > > > > > >>>>>>>>>>>>> a new broker config for this number?
> > > > > > > > > > >>>>>>>>>>>> Good point.  I added two broker
> configuration
> > > > > > > parameters to
> > > > > > > > > control
> > > > > > > > > > >>>>>>>>>> this
> > > > > > > > > > >>>>>>>>>>>> number.
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>> I am curious to see whether we can avoid
> some of
> > > > > these
> > > > > > > new
> > > > > > > > > configs. For
> > > > > > > > > > >>>>>>>>>>> example, incremental.fetch.session.
> > > > > > > cache.slots.per.broker
> > > > > > > > > is probably
> > > > > > > > > > >>>>>>>>>> not
> > > > > > > > > > >>>>>>>>>>> necessary because if a leader knows that a
> > > > > FetchRequest
> > > > > > > > > comes from a
> > > > > > > > > > >>>>>>>>>>> follower, we probably want the leader to
> always
> > > > > cache the
> > > > > > > > > information
> > > > > > > > > > >>>>>>>>>>> from that follower. Does this make sense?
> > > > > > > > > > >>>>>>>>>> Yeah, maybe we can avoid having
> > > > > > > > > > >>>>>>>>>> incremental.fetch.session.
> cache.slots.per.broker.
> > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>> Maybe we can discuss the config later after
> > > there is
> > > > > > > > > agreement on how the
> > > > > > > > > > >>>>>>>>>>> protocol would look like.
> > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> What is the error code if broker does
> > > > > > > > > > >>>>>>>>>>>>> not have new log for the incoming
> FetchRequest?
> > > > > > > > > > >>>>>>>>>>>> Hmm, is there a typo in this question?
> Maybe
> > > you
> > > > > meant
> > > > > > > to
> > > > > > > > > ask what
> > > > > > > > > > >>>>>>>>>>>> happens if there is no new cache slot for
> the
> > > > > incoming
> > > > > > > > > FetchRequest?
> > > > > > > > > > >>>>>>>>>>>> That's not an error-- the incremental fetch
> > > session
> > > > > ID
> > > > > > > just
> > > > > > > > > gets set to
> > > > > > > > > > >>>>>>>>>>>> 0, indicating no incremental fetch session
> was
> > > > > created.
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>> Yeah there is a typo. You have answered my
> > > question.
> > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> 5. Can you clarify what happens if follower
> > > adds a
> > > > > > > > > partition to the
> > > > > > > > > > >>>>>>>>>>>>> ReplicaFetcherThread after receiving
> > > > > > > LeaderAndIsrRequest?
> > > > > > > > > Does leader
> > > > > > > > > > >>>>>>>>>>>>> needs to generate a new session for this
> > > > > > > > > ReplicaFetcherThread or
> > > > > > > > > > >>>>>>>>>> does it
> > > > > > > > > > >>>>>>>>>>>> re-use
> > > > > > > > > > >>>>>>>>>>>>> the existing session?  If it uses a new
> > > session,
> > > > > is the
> > > > > > > > > old session
> > > > > > > > > > >>>>>>>>>>>>> actively deleted from the slot?
> > > > > > > > > > >>>>>>>>>>>> The basic idea is that you can't make
> changes,
> > > > > except by
> > > > > > > > > sending a full
> > > > > > > > > > >>>>>>>>>>>> fetch request.  However, perhaps we can
> allow
> > > the
> > > > > > > client to
> > > > > > > > > re-use its
> > > > > > > > > > >>>>>>>>>>>> existing session ID.  If the client sets
> > > sessionId
> > > > > = id,
> > > > > > > > > epoch = 0, it
> > > > > > > > > > >>>>>>>>>>>> could re-initialize the session.
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>> Yeah I agree with the basic idea. We probably
> > > want to
> > > > > > > > > understand more
> > > > > > > > > > >>>>>>>>>>> detail about how this works later.
> > > > > > > > > > >>>>>>>>>> Sounds good.  I updated the KIP with this
> > > > > information.  A
> > > > > > > > > > >>>>>>>>>> re-initialization should be exactly the same
> as an
> > > > > > > > > initialization,
> > > > > > > > > > >>>>>>>>>> except that it reuses an existing ID.
> > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > >>>>>>>>>> best,
> > > > > > > > > > >>>>>>>>>> Colin
> > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> BTW, I think it may be useful if the KIP
> can
> > > > > include
> > > > > > > the
> > > > > > > > > example
> > > > > > > > > > >>>>>>>>>> workflow
> > > > > > > > > > >>>>>>>>>>>>> of how this feature will be used in case of
> > > > > partition
> > > > > > > > > change and so
> > > > > > > > > > >>>>>>>>>> on.
> > > > > > > > > > >>>>>>>>>>>> Yeah, that might help.
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>> best,
> > > > > > > > > > >>>>>>>>>>>> Colin
> > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> Thanks,
> > > > > > > > > > >>>>>>>>>>>>> Dong
> > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin
> McCabe<
> > > > > > > > > cmccabe@apache.org>
> > > > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>> I updated the KIP with the ideas we've
> been
> > > > > > > discussing.
> > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>> best,
> > > > > > > > > > >>>>>>>>>>>>>> Colin
> > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin
> McCabe
> > > > > wrote:
> > > > > > > > > > >>>>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan
> Filipiak
> > > > > wrote:
> > > > > > > > > > >>>>>>>>>>>>>>>> Hi Colin, thank you  for this KIP, it
> can
> > > > > become a
> > > > > > > > > really
> > > > > > > > > > >>>>>>>>>> useful
> > > > > > > > > > >>>>>>>>>>>> thing.
> > > > > > > > > > >>>>>>>>>>>>>>>> I just scanned through the discussion so
> > > far and
> > > > > > > wanted
> > > > > > > > > to
> > > > > > > > > > >>>>>>>>>> start a
> > > > > > > > > > >>>>>>>>>>>>>>>> thread to make as decision about
> keeping the
> > > > > > > > > > >>>>>>>>>>>>>>>> cache with the Connection / Session or
> > > having
> > > > > some
> > > > > > > sort
> > > > > > > > > of UUID
> > > > > > > > > > >>>>>>>>>>>> indN
> > > > > > > > > > >>>>>>>>>>>>>> exed
> > > > > > > > > > >>>>>>>>>>>>>>>> global Map.
> > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>> Sorry if that has been settled already
> and I
> > > > > missed
> > > > > > > it.
> > > > > > > > > In this
> > > > > > > > > > >>>>>>>>>>>> case
> > > > > > > > > > >>>>>>>>>>>>>>>> could anyone point me to the discussion?
> > > > > > > > > > >>>>>>>>>>>>>>> Hi Jan,
> > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>> I don't think anyone has discussed the
> idea
> > > of
> > > > > tying
> > > > > > > the
> > > > > > > > > cache
> > > > > > > > > > >>>>>>>>>> to an
> > > > > > > > > > >>>>>>>>>>>>>>> individual TCP session yet.  I agree that
> > > since
> > > > > the
> > > > > > > > > cache is
> > > > > > > > > > >>>>>>>>>>>> intended to
> > > > > > > > > > >>>>>>>>>>>>>>> be used only by a single follower or
> client,
> > > > > it's an
> > > > > > > > > interesting
> > > > > > > > > > >>>>>>>>>>>> thing
> > > > > > > > > > >>>>>>>>>>>>>>> to think about.
> > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>> I guess the obvious disadvantage is that
> > > whenever
> > > > > > > your
> > > > > > > > > TCP
> > > > > > > > > > >>>>>>>>>> session
> > > > > > > > > > >>>>>>>>>>>>>>> drops, you have to make a full fetch
> request
> > > > > rather
> > > > > > > than
> > > > > > > > > an
> > > > > > > > > > >>>>>>>>>>>> incremental
> > > > > > > > > > >>>>>>>>>>>>>>> one.  It's not clear to me how often this
> > > > > happens in
> > > > > > > > > practice --
> > > > > > > > > > >>>>>>>>>> it
> > > > > > > > > > >>>>>>>>>>>>>>> probably depends a lot on the quality of
> the
> > > > > network.
> > > > > > > > > From a
> > > > > > > > > > >>>>>>>>>> code
> > > > > > > > > > >>>>>>>>>>>>>>> perspective, it might also be a bit
> > > difficult to
> > > > > > > access
> > > > > > > > > data
> > > > > > > > > > >>>>>>>>>>>> associated
> > > > > > > > > > >>>>>>>>>>>>>>> with the Session from classes like
> KafkaApis
> > > > > > > (although
> > > > > > > > > we could
> > > > > > > > > > >>>>>>>>>>>> refactor
> > > > > > > > > > >>>>>>>>>>>>>>> it to make this easier).
> > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>> It's also clear that even if we tie the
> > > cache to
> > > > > the
> > > > > > > > > session, we
> > > > > > > > > > >>>>>>>>>>>> still
> > > > > > > > > > >>>>>>>>>>>>>>> have to have limits on the number of
> caches
> > > we're
> > > > > > > > > willing to
> > > > > > > > > > >>>>>>>>>> create.
> > > > > > > > > > >>>>>>>>>>>>>>> And probably we should reserve some cache
> > > slots
> > > > > for
> > > > > > > each
> > > > > > > > > > >>>>>>>>>> follower, so
> > > > > > > > > > >>>>>>>>>>>>>>> that clients don't take all of them.
> > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>> Id rather see a protocol in which the
> > > client is
> > > > > > > hinting
> > > > > > > > > the
> > > > > > > > > > >>>>>>>>>> broker
> > > > > > > > > > >>>>>>>>>>>>>> that,
> > > > > > > > > > >>>>>>>>>>>>>>>> he is going to use the feature instead
> of a
> > > > > client
> > > > > > > > > > >>>>>>>>>>>>>>>> realizing that the broker just offered
> the
> > > > > feature
> > > > > > > > > (regardless
> > > > > > > > > > >>>>>>>>>> of
> > > > > > > > > > >>>>>>>>>>>>>>>> protocol version which should only
> indicate
> > > > > that the
> > > > > > > > > feature
> > > > > > > > > > >>>>>>>>>>>>>>>> would be usable).
> > > > > > > > > > >>>>>>>>>>>>>>> Hmm.  I'm not sure what you mean by
> > > "hinting."
> > > > > I do
> > > > > > > > > think that
> > > > > > > > > > >>>>>>>>>> the
> > > > > > > > > > >>>>>>>>>>>>>>> server should have the option of not
> > > accepting
> > > > > > > > > incremental
> > > > > > > > > > >>>>>>>>>> requests
> > > > > > > > > > >>>>>>>>>>>> from
> > > > > > > > > > >>>>>>>>>>>>>>> specific clients, in order to save memory
> > > space.
> > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>> This seems to work better with a per
> > > > > > > > > > >>>>>>>>>>>>>>>> connection/session attached Metadata
> than
> > > with
> > > > > a Map
> > > > > > > > > and could
> > > > > > > > > > >>>>>>>>>>>> allow
> > > > > > > > > > >>>>>>>>>>>>>> for
> > > > > > > > > > >>>>>>>>>>>>>>>> easier client implementations.
> > > > > > > > > > >>>>>>>>>>>>>>>> It would also make Client-side code
> easier
> > > as
> > > > > there
> > > > > > > > > wouldn't
> > > > > > > > > > >>>>>>>>>> be any
> > > > > > > > > > >>>>>>>>>>>>>>>> Cache-miss error Messages to handle.
> > > > > > > > > > >>>>>>>>>>>>>>> It is nice not to have to handle
> cache-miss
> > > > > > > responses, I
> > > > > > > > > agree.
> > > > > > > > > > >>>>>>>>>>>>>>> However, TCP sessions aren't exposed to
> most
> > > of
> > > > > our
> > > > > > > > > client-side
> > > > > > > > > > >>>>>>>>>> code.
> > > > > > > > > > >>>>>>>>>>>>>>> For example, when the Producer creates a
> > > message
> > > > > and
> > > > > > > > > hands it
> > > > > > > > > > >>>>>>>>>> off to
> > > > > > > > > > >>>>>>>>>>>> the
> > > > > > > > > > >>>>>>>>>>>>>>> NetworkClient, the NC will transparently
> > > > > re-connect
> > > > > > > and
> > > > > > > > > re-send a
> > > > > > > > > > >>>>>>>>>>>>>>> message if the first send failed.  The
> > > > > higher-level
> > > > > > > code
> > > > > > > > > will
> > > > > > > > > > >>>>>>>>>> not be
> > > > > > > > > > >>>>>>>>>>>>>>> informed about whether the TCP session
> was
> > > > > > > > > re-established,
> > > > > > > > > > >>>>>>>>>> whether an
> > > > > > > > > > >>>>>>>>>>>>>>> existing TCP session was used, and so
> on.  So
> > > > > > > overall I
> > > > > > > > > would
> > > > > > > > > > >>>>>>>>>> still
> > > > > > > > > > >>>>>>>>>>>> lean
> > > > > > > > > > >>>>>>>>>>>>>>> towards not coupling this to the TCP
> > > session...
> > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>> best,
> > > > > > > > > > >>>>>>>>>>>>>>> Colin
> > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>>       Thank you again for the KIP. And
> > > again, if
> > > > > > > this
> > > > > > > > > was clarified
> > > > > > > > > > >>>>>>>>>>>> already
> > > > > > > > > > >>>>>>>>>>>>>>>> please drop me a hint where I could read
> > > about
> > > > > it.
> > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>> Best Jan
> > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > > > > > > > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>>> I created a KIP to improve the
> scalability
> > > and
> > > > > > > latency
> > > > > > > > > of
> > > > > > > > > > >>>>>>>>>>>>>> FetchRequest:
> > > > > > > > > > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> > > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > >>>>>>>>>>>>>> 227%3A+Introduce+Incremental+
> > > > > > > FetchRequests+to+Increase+
> > > > > > > > > > >>>>>>>>>>>>>> Partition+Scalability
> > > > > > > > > > >>>>>>>>>>>>>>>>> Please take a look.
> > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > >>>>>>>>>>>>>>>>> cheers,
> > > > > > > > > > >>>>>>>>>>>>>>>>> Colin
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message