kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin McCabe <cmcc...@apache.org>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Wed, 03 Jan 2018 17:37:26 GMT
On Tue, Jan 2, 2018, at 23:49, Becket Qin wrote:
> Thanks for the reply, Colin.
> 
> My concern for the reinitialization is potential churn rather than
> efficiency. The current KIP proposal uses the time and priority based
> protection to avoid thrashing, but it is not clear to me if that is
> sufficient. For example, consider topic creation/deletion. In those cases,
> a lot of the replica fetchers will potentially need to re-establish the
> session. And there might be many client session got evicted. And thus again
> need to re-establish sessions. This would involve two round trips (due to
> InvalidFetchSessionException), potential metadata refresh and backoff.

Hi Becket,

When a fetcher is adding or removing a partition, it can re-use its existing fetch session.  There is no cache churn, and nobody gets evicted, in this case.  The fetcher just has to send a full fetch request to establish what it wants the new partition set to be.

best,
Colin

> 
> Admittedly it is probably not going to be worse than what we have now, but
> such uncertain impact still worries me. Are we going to have the follow up
> optimization discussion before the implementation of this KIP or are we
> going to do it after? In the past we used to have separate KIPs for a
> complicated feature but implement them together. Perhaps we can do the same
> here if you want to limit the scope of this KIP.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> On Tue, Jan 2, 2018 at 6:34 PM, Colin McCabe <cmccabe@apache.org> wrote:
> 
> > On Tue, Jan 2, 2018, at 04:46, Becket Qin wrote:
> > > Hi Colin,
> > >
> > > Good point about KIP-226. Maybe a separate broker epoch is needed
> > although
> > > it is a little awkward to let the consumer set this. So was there a
> > > solution to the frequent pause and resume scenario? Did I miss something?
> > >
> > > Thanks,
> > > Jiangjie (Becket) Qin
> >
> > Hi Becket,
> >
> > Allowing sessions to be re-initialized (as the current KIP does) makes
> > frequent pauses and resumes less painful, because the memory associated
> > with the old session can be reclaimed.  The only cost is sending a full
> > fetch request once when the pause or resume is activated.
> >
> > There are other follow-on optimizations that we might want to do later,
> > like allowing new partitions to be added to existing fetch sessions without
> > a re-initialization, that could make this even more efficient.  But that's
> > not in the current KIP, in order to avoid expanding the scope too much.
> >
> > best,
> > Colin
> >
> > >
> > > On Wed, Dec 27, 2017 at 1:40 PM, Colin McCabe <cmccabe@apache.org>
> > wrote:
> > >
> > > > On Sat, Dec 23, 2017, at 09:15, Becket Qin wrote:
> > > > > Hi Colin,
> > > > >
> > > > > Thanks for the explanation. I want to clarify a bit more on my
> > thoughts.
> > > > >
> > > > > I am fine with having a separate discussion as long as the follow-up
> > > > > discussion will be incremental on top of this KIP instead of
> > override the
> > > > > protocol in this KIP.
> > > >
> > > > Hi Becket,
> > > >
> > > > Thanks for the clarification.  I do think that the changes we've been
> > > > discussing would be incremental rather than completely replacing what
> > we've
> > > > talked about here.  See my responses inline below.
> > > >
> > > > >
> > > > > I completely agree this KIP is useful by itself. That being said, we
> > want
> > > > > to avoid falling into a "local optimal" solution by just saying
> > because
> > > > it
> > > > > solves the problem in this scope. I think we should also think if the
> > > > > solution aligns with a "global optimal" (systematic optimal)
> > solution as
> > > > > well. That is why I brought up other considerations. If they turned
> > out
> > > > to
> > > > > be orthogonal and should be addressed separately, that's good. But at
> > > > least
> > > > > it is worth thinking about the potential connections between those
> > > > things.
> > > > >
> > > > > One example of such related consideration is the following two
> > seemingly
> > > > > unrelated things:
> > > > >
> > > > > 1. I might have missed the discussion, but it seems the concern of
> > the
> > > > > clients doing frequent pause and resume is still not addressed. Since
> > > > this
> > > > > is a pretty common use case for applications that want to have flow
> > > > > control, or have prioritized consumption, or get consumption
> > fairness, we
> > > > > probably want to see how to handle this case. One of the solution
> > might
> > > > be
> > > > > a long-lived session id spanning the clients' life time.
> > > > >
> > > > > 2. KAFKA-6029. The key problem is that the leader wants to know if a
> > > > fetch
> > > > > request is from a shutting down broker or from a restarted broker.
> > > > >
> > > > > The connection between those two issues is that both of them could be
> > > > > addressed by having a life-long session id for each client (or
> > fetcher,
> > > > to
> > > > > be more accurate). This may indicate that having a life long session
> > id
> > > > > might be a "global optimal" solution so it should be considered in
> > this
> > > > > KIP. Otherwise, a follow up KIP discussion for KAFKA-6029 may either
> > > > > introduce a broker epoch unnecessarily (which will not be used by the
> > > > > consumers at all) or override what we do in this KIP.
> > > >
> > > > Remember that a given follower will have more than one fetch session
> > ID.
> > > > Each fetcher thread will have its own session ID.  And we will
> > eventually
> > > > be able to dynamically add or remove fetcher threads using KIP-226.
> > > > Therefore, we can't use fetch session IDs to uniquely identify a given
> > > > broker incarnation.  Any time we increase the number of fetcher
> > threads, a
> > > > new fetch session ID will show up.
> > > >
> > > > If we want to know if a fetch request is from a shutting down broker or
> > > > from a restarted broker, the most straightforward and robust way would
> > > > probably be to add an incarnation number for each broker.  ZK can track
> > > > this number.  This also helps with debugging and logging (you can tell
> > > > "aha-- this request came from the second incarnation, not the first."
> > > >
> > > > > BTW, to clarify, the main purpose of returning the data at the index
> > > > > boundary was to get the same benefit of efficient incremental fetch
> > for
> > > > > both low vol and high vol partitions, which is directly related to
> > the
> > > > > primary goal in this KIP. The other things (such as avoiding binary
> > > > search)
> > > > > are just potential additional gain, and they are also brought up to
> > see
> > > > if
> > > > > that could be a "global optimal" solution.
> > > >
> > > > I still think these are separate.  The primary goal of the KIP was to
> > make
> > > > fetch requests where not all partitions are returning data more
> > efficient.
> > > > This isn't really related to the goal of trying to make accessing
> > > > historical data more efficient.  In most cases, the data we're
> > accessing is
> > > > very recent data, and index lookups are not an issue.
> > > >
> > > > >
> > > > > Some other replies below.
> > > > > >In order for improvements to succeed, I think that it's important to
> > > > > clearly define the scope and goals.  One good example of this was the
> > > > > AdminClient KIP.  We deliberately avoiding ?>discussing new
> > > > administrative
> > > > > RPCs in that KIP, in order to limit the scope.  This kept the
> > discussion
> > > > > focused on the user interfaces and configuration, rather than on the
> > > > > details of possible >new RPCs.  Once the KIP was completed, it was
> > easy
> > > > for
> > > > > us to add new RPCs later in separate KIPs.
> > > > > Hmm, why is AdminClient is related? All the discussion are about how
> > to
> > > > > make fetch more efficient, right?
> > > > >
> > > > > >Finally, it's not clear that the approach you are proposing is the
> > right
> > > > > way to go.  I think we would need to have a lot more discussion
> > about it.
> > > > > One very big disadvantage is that it couples >what we send back on
> > the
> > > > wire
> > > > > tightly to what is on the disk.  It's not clear that we want to do
> > that.
> > > > > What if we want to change how things are stored in the future?  How
> > does
> > > > > this work with >clients' own concept of fetch sizes?  And so on, and
> > so
> > > > > on.  This needs its own discussion thread.
> > > > > That might be true. However, the index file by definition is for the
> > > > files
> > > > > stored on the disk. So if we decide to change the storage layer to
> > > > > something else, it seems natural to use some other suitable ways to
> > get
> > > > the
> > > > > offsets efficiently.
> > > > >
> > > > > >There are a lot of simpler solutions that might work as well or
> > better.
> > > > > For example, each partition could keep an in-memory LRU cache of the
> > most
> > > > > recently used offset to file position >mappings.  Or we could have a
> > > > thread
> > > > > periodically touch the latest page or two of memory in the index
> > file for
> > > > > each partition, to make sure that it didn't fall out of the cache.
> > In
> > > > some
> > > > > offline >discussions, some of these approaches have looked quite
> > > > > promising.  I've even seen some good performance numbers for
> > prototypes.
> > > > > In any case, it's a separate problem which needs its >own KIP, I
> > think.
> > > > > Those are indeed separate discussions. I was not intended to discuss
> > them
> > > > > in this KIP. Sorry about the confusion.
> > > > >
> > > > > Thanks and Merry Christmas,
> > > >
> > > > Happy new year.  Sorry if some of my responses are delayed (I'm on
> > > > vacation).
> > > >
> > > > cheers,
> > > > Colin
> > > >
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Sat, Dec 23, 2017 at 1:16 AM, Colin McCabe <cmccabe@apache.org>
> > > > wrote:
> > > > >
> > > > > > On Fri, Dec 22, 2017, at 14:31, Becket Qin wrote:
> > > > > > > >>
> > > > > > > >> The point I want to make is that avoiding doing binary search
> > on
> > > > index
> > > > > > > >> file and avoid reading the log segments during fetch has some
> > > > > > additional
> > > > > > > >> benefits. So if the solution works for the current KIP, it
> > might
> > > > be a
> > > > > > > >> better choice.
> > > > > > >
> > > > > > > >Let's discuss this in a follow-on KIP.
> > > > > > >
> > > > > > > If the discussion will potentially change the protocol in the
> > current
> > > > > > > proposal. Would it be better to discuss it now instead of in a
> > > > follow-up
> > > > > > > KIP so we don't have some protocol that immediately requires a
> > > > change.
> > > > > >
> > > > > > Hi Becket,
> > > > > >
> > > > > > I think that the problem that you are discussing is different than
> > the
> > > > > > problem this KIP is designed to address.  This KIP is targeted at
> > > > > > eliminating the wastefulness of re-transmitting information about
> > > > > > partitions that haven't changed in every FetchRequest and
> > > > FetchResponse.
> > > > > > The problem you are discussing is dealing with situations where the
> > > > index
> > > > > > file or the data file is not in the page cache, and therefore we
> > take a
> > > > > > page fault when doing an index lookup.
> > > > > >
> > > > > > This KIP is useful and valuable on its own.  For example, if you
> > have
> > > > > > brokers in a public cloud in different availability zones, you may
> > > > wish to
> > > > > > minimize the network traffic between them.  Therefore, you don't
> > want
> > > > every
> > > > > > FetchRequest between brokers to be a full FetchRequest.  In that
> > case,
> > > > this
> > > > > > KIP is very valuable.
> > > > > >
> > > > > > In order for improvements to succeed, I think that it's important
> > to
> > > > > > clearly define the scope and goals.  One good example of this was
> > the
> > > > > > AdminClient KIP.  We deliberately avoiding discussing new
> > > > administrative
> > > > > > RPCs in that KIP, in order to limit the scope.  This kept the
> > > > discussion
> > > > > > focused on the user interfaces and configuration, rather than on
> > the
> > > > > > details of possible new RPCs.  Once the KIP was completed, it was
> > easy
> > > > for
> > > > > > us to add new RPCs later in separate KIPs.
> > > > > >
> > > > > > While it's clear that there is probably even more we could do to
> > > > optimize
> > > > > > fetch requests, making them incremental seems like a good first
> > cut.  I
> > > > > > deliberately avoided changing the replication protocol in this KIP,
> > > > because
> > > > > > I think that it's a big enough change as-is.  If we want to change
> > the
> > > > > > replication protocol in the future, there is nothing preventing
> > us...
> > > > and
> > > > > > this change will be a useful starting point.
> > > > > >
> > > > > > Finally, it's not clear that the approach you are proposing is the
> > > > right
> > > > > > way to go.  I think we would need to have a lot more discussion
> > about
> > > > it.
> > > > > > One very big disadvantage is that it couples what we send back on
> > the
> > > > wire
> > > > > > tightly to what is on the disk.  It's not clear that we want to do
> > > > that.
> > > > > > What if we want to change how things are stored in the future?  How
> > > > does
> > > > > > this work with clients' own concept of fetch sizes?  And so on,
> > and so
> > > > on.
> > > > > > This needs its own discussion thread.
> > > > > >
> > > > > > There are a lot of simpler solutions that might work as well or
> > better.
> > > > > > For example, each partition could keep an in-memory LRU cache of
> > the
> > > > most
> > > > > > recently used offset to file position mappings.  Or we could have a
> > > > thread
> > > > > > periodically touch the latest page or two of memory in the index
> > file
> > > > for
> > > > > > each partition, to make sure that it didn't fall out of the
> > cache.  In
> > > > some
> > > > > > offline discussions, some of these approaches have looked quite
> > > > promising.
> > > > > > I've even seen some good performance numbers for prototypes.  In
> > any
> > > > case,
> > > > > > it's a separate problem which needs its own KIP, I think.
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Dec 19, 2017 at 9:26 AM, Colin McCabe <colin@cmccabe.xyz
> > >
> > > > wrote:
> > > > > > >
> > > > > > > > On Tue, Dec 19, 2017, at 02:16, Jan Filipiak wrote:
> > > > > > > > > Sorry for coming back at this so late.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 11.12.2017 07:12, Colin McCabe wrote:
> > > > > > > > > > On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote:
> > > > > > > > > >> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote:
> > > > > > > > > >>> Hi,
> > > > > > > > > >>>
> > > > > > > > > >>> sorry for the late reply, busy times :-/
> > > > > > > > > >>>
> > > > > > > > > >>> I would ask you one thing maybe. Since the timeout
> > > > > > > > > >>> argument seems to be settled I have no further argument
> > > > > > > > > >>> form your side except the "i don't want to".
> > > > > > > > > >>>
> > > > > > > > > >>> Can you see that connection.max.idle.max is the exact
> > time
> > > > > > > > > >>> that expresses "We expect the client to be away for this
> > > > long,
> > > > > > > > > >>> and come back and continue"?
> > > > > > > > > >> Hi Jan,
> > > > > > > > > >>
> > > > > > > > > >> Sure, connection.max.idle.max is the exact time that we
> > want
> > > > to
> > > > > > keep
> > > > > > > > > >> around a TCP session.  TCP sessions are relatively cheap,
> > so
> > > > we
> > > > > > can
> > > > > > > > > >> afford to keep them around for 10 minutes by default.
> > > > Incremental
> > > > > > > > fetch
> > > > > > > > > >> state is less cheap, so we want to set a shorter timeout
> > for
> > > > it.
> > > > > > We
> > > > > > > > > >> also want new TCP sessions to be able to reuse an existing
> > > > > > incremental
> > > > > > > > > >> fetch session rather than creating a new one and waiting
> > for
> > > > the
> > > > > > old
> > > > > > > > one
> > > > > > > > > >> to time out.
> > > > > > > > > >>
> > > > > > > > > >>> also clarified some stuff inline
> > > > > > > > > >>>
> > > > > > > > > >>> Best Jan
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>> On 05.12.2017 23:14, Colin McCabe wrote:
> > > > > > > > > >>>> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > > > > > > >>>>> Hi Colin
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Addressing the topic of how to manage slots from the
> > other
> > > > > > thread.
> > > > > > > > > >>>>> With tcp connections all this comes for free
> > essentially.
> > > > > > > > > >>>> Hi Jan,
> > > > > > > > > >>>>
> > > > > > > > > >>>> I don't think that it's accurate to say that cache
> > > > management
> > > > > > > > "comes for
> > > > > > > > > >>>> free" by coupling the incremental fetch session with
> > the TCP
> > > > > > > > session.
> > > > > > > > > >>>> When a new TCP session is started by a fetch request,
> > you
> > > > still
> > > > > > > > have to
> > > > > > > > > >>>> decide whether to grant that request an incremental
> > fetch
> > > > > > session or
> > > > > > > > > >>>> not.  If your answer is that you always grant the
> > request, I
> > > > > > would
> > > > > > > > argue
> > > > > > > > > >>>> that you do not have cache management.
> > > > > > > > > >>> First I would say, the client has a big say in this. If
> > the
> > > > > > client
> > > > > > > > > >>> is not going to issue incremental he shouldn't ask for a
> > > > cache
> > > > > > > > > >>> when the client ask for the cache we still have all
> > options
> > > > to
> > > > > > deny.
> > > > > > > > > >> To put it simply, we have to have some cache management
> > above
> > > > and
> > > > > > > > beyond
> > > > > > > > > >> just giving out an incremental fetch session to anyone who
> > > > has a
> > > > > > TCP
> > > > > > > > > >> session.  Therefore, caching does not become simpler if
> > you
> > > > > > couple the
> > > > > > > > > >> fetch session to the TCP session.
> > > > > > > > > Simply giving out an fetch session for everyone with a
> > > > connection is
> > > > > > too
> > > > > > > > > simple,
> > > > > > > > > but I think it plays well into the idea of consumers
> > choosing to
> > > > use
> > > > > > the
> > > > > > > > > feature
> > > > > > > > > therefore only enabling where it brings maximum gains
> > > > > > > > > (replicas,MirrorMakers)
> > > > > > > > > >>
> > > > > > > > > >>>> I guess you could argue that timeouts are cache
> > management,
> > > > but
> > > > > > I
> > > > > > > > don't
> > > > > > > > > >>>> find that argument persuasive.  Anyone could just
> > create a
> > > > lot
> > > > > > of
> > > > > > > > TCP
> > > > > > > > > >>>> sessions and use a lot of resources, in that case.  So
> > > > there is
> > > > > > > > > >>>> essentially no limit on memory use.  In any case, TCP
> > > > sessions
> > > > > > don't
> > > > > > > > > >>>> help us implement fetch session timeouts.
> > > > > > > > > >>> We still have all the options denying the request to
> > keep the
> > > > > > state.
> > > > > > > > > >>> What you want seems like a max connections / ip
> > safeguard.
> > > > > > > > > >>> I can currently take down a broker with to many
> > connections
> > > > > > easily.
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>>>> I still would argue we disable it by default and make a
> > > > flag
> > > > > > in the
> > > > > > > > > >>>>> broker to ask the leader to maintain the cache while
> > > > > > replicating
> > > > > > > > and also only
> > > > > > > > > >>>>> have it optional in consumers (default to off) so one
> > can
> > > > turn
> > > > > > it
> > > > > > > > on
> > > > > > > > > >>>>> where it really hurts.  MirrorMaker and audit consumers
> > > > > > > > prominently.
> > > > > > > > > >>>> I agree with Jason's point from earlier in the thread.
> > > > Adding
> > > > > > extra
> > > > > > > > > >>>> configuration knobs that aren't really necessary can
> > harm
> > > > > > usability.
> > > > > > > > > >>>> Certainly asking people to manually turn on a feature
> > > > "where it
> > > > > > > > really
> > > > > > > > > >>>> hurts" seems to fall in that category, when we could
> > easily
> > > > > > enable
> > > > > > > > it
> > > > > > > > > >>>> automatically for them.
> > > > > > > > > >>> This doesn't make much sense to me.
> > > > > > > > > >> There are no tradeoffs to think about from the client's
> > point
> > > > of
> > > > > > view:
> > > > > > > > > >> it always wants an incremental fetch session.  So there
> > is no
> > > > > > benefit
> > > > > > > > to
> > > > > > > > > >> making the clients configure an extra setting.  Updating
> > and
> > > > > > managing
> > > > > > > > > >> client configurations is also more difficult than managing
> > > > broker
> > > > > > > > > >> configurations for most users.
> > > > > > > > > >>
> > > > > > > > > >>> You also wanted to implement
> > > > > > > > > >>> a "turn of in case of bug"-knob. Having the client
> > indicate
> > > > if
> > > > > > the
> > > > > > > > > >>> feauture will be used seems reasonable to me.,
> > > > > > > > > >> True.  However, if there is a bug, we could also roll
> > back the
> > > > > > client,
> > > > > > > > > >> so having this configuration knob is not strictly
> > required.
> > > > > > > > > >>
> > > > > > > > > >>>>> Otherwise I left a few remarks in-line, which should
> > help
> > > > to
> > > > > > > > understand
> > > > > > > > > >>>>> my view of the situation better
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Best Jan
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> On 05.12.2017 08:06, Colin McCabe wrote:
> > > > > > > > > >>>>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > > > > > > > >>>>>>> On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > > > > > >>>>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > > > > > > > >>>>>>>>> Thanks for the explanation, Colin. A few more
> > > > questions.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>> The session epoch is not complex.  It's just a
> > number
> > > > > > which
> > > > > > > > increments
> > > > > > > > > >>>>>>>>>> on each incremental fetch.  The session epoch is
> > also
> > > > > > useful
> > > > > > > > for
> > > > > > > > > >>>>>>>>>> debugging-- it allows you to match up requests and
> > > > > > responses
> > > > > > > > when
> > > > > > > > > >>>>>>>>>> looking at log files.
> > > > > > > > > >>>>>>>>> Currently each request in Kafka has a correlation
> > id to
> > > > > > help
> > > > > > > > match the
> > > > > > > > > >>>>>>>>> requests and responses. Is epoch doing something
> > > > > > differently?
> > > > > > > > > >>>>>>>> Hi Becket,
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> The correlation ID is used within a single TCP
> > session,
> > > > to
> > > > > > > > uniquely
> > > > > > > > > >>>>>>>> associate a request with a response.  The
> > correlation
> > > > ID is
> > > > > > not
> > > > > > > > unique
> > > > > > > > > >>>>>>>> (and has no meaning) outside the context of that
> > single
> > > > TCP
> > > > > > > > session.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> Keep in mind, NetworkClient is in charge of TCP
> > > > sessions,
> > > > > > and
> > > > > > > > generally
> > > > > > > > > >>>>>>>> tries to hide that information from the upper
> > layers of
> > > > the
> > > > > > > > code.  So
> > > > > > > > > >>>>>>>> when you submit a request to NetworkClient, you
> > don't
> > > > know
> > > > > > if
> > > > > > > > that
> > > > > > > > > >>>>>>>> request creates a TCP session, or reuses an existing
> > > > one.
> > > > > > > > > >>>>>>>>>> Unfortunately, this doesn't work.  Imagine the
> > client
> > > > > > misses
> > > > > > > > an
> > > > > > > > > >>>>>>>>>> increment fetch response about a partition.  And
> > then
> > > > the
> > > > > > > > partition is
> > > > > > > > > >>>>>>>>>> never updated after that.  The client has no way
> > to
> > > > know
> > > > > > > > about the
> > > > > > > > > >>>>>>>>>> partition, since it won't be included in any
> > future
> > > > > > > > incremental fetch
> > > > > > > > > >>>>>>>>>> responses.  And there are no offsets to compare,
> > > > since the
> > > > > > > > partition is
> > > > > > > > > >>>>>>>>>> simply omitted from the response.
> > > > > > > > > >>>>>>>>> I am curious about in which situation would the
> > > > follower
> > > > > > miss
> > > > > > > > a response
> > > > > > > > > >>>>>>>>> of a partition. If the entire FetchResponse is lost
> > > > (e.g.
> > > > > > > > timeout), the
> > > > > > > > > >>>>>>>>> follower would disconnect and retry. That will
> > result
> > > > in
> > > > > > > > sending a full
> > > > > > > > > >>>>>>>>> FetchRequest.
> > > > > > > > > >>>>>>>> Basically, you are proposing that we rely on TCP for
> > > > > > reliable
> > > > > > > > delivery
> > > > > > > > > >>>>>>>> in a distributed system.  That isn't a good idea
> > for a
> > > > > > bunch of
> > > > > > > > > >>>>>>>> different reasons.  First of all, TCP timeouts tend
> > to
> > > > be
> > > > > > very
> > > > > > > > long.  So
> > > > > > > > > >>>>>>>> if the TCP session timing out is your error
> > detection
> > > > > > > > mechanism, you
> > > > > > > > > >>>>>>>> have to wait minutes for messages to timeout.  Of
> > > > course, we
> > > > > > > > add a
> > > > > > > > > >>>>>>>> timeout on top of that after which we declare the
> > > > connection
> > > > > > > > bad and
> > > > > > > > > >>>>>>>> manually close it.  But just because the session is
> > > > closed
> > > > > > on
> > > > > > > > one end
> > > > > > > > > >>>>>>>> doesn't mean that the other end knows that it is
> > > > closed.  So
> > > > > > > > the leader
> > > > > > > > > >>>>>>>> may have to wait quite a long time before TCP
> > decides
> > > > that
> > > > > > yes,
> > > > > > > > > >>>>>>>> connection X from the follower is dead and not
> > coming
> > > > back,
> > > > > > > > even though
> > > > > > > > > >>>>>>>> gremlins ate the FIN packet which the follower
> > > > attempted to
> > > > > > > > translate.
> > > > > > > > > >>>>>>>> If the cache state is tied to that TCP session, we
> > have
> > > > to
> > > > > > keep
> > > > > > > > that
> > > > > > > > > >>>>>>>> cache around for a much longer time than we should.
> > > > > > > > > >>>>>>> Hi,
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> I see this from a different perspective. The cache
> > expiry
> > > > > > time
> > > > > > > > > >>>>>>> has the same semantic as idle connection time in this
> > > > > > scenario.
> > > > > > > > > >>>>>>> It is the time range we expect the client to come
> > back an
> > > > > > reuse
> > > > > > > > > >>>>>>> its broker side state. I would argue that on close we
> > > > would
> > > > > > get
> > > > > > > > an
> > > > > > > > > >>>>>>> extra shot at cleaning up the session state early. As
> > > > > > opposed to
> > > > > > > > > >>>>>>> always wait for that duration for expiry to happen.
> > > > > > > > > >>>>>> Hi Jan,
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> The idea here is that the incremental fetch cache
> > expiry
> > > > time
> > > > > > can
> > > > > > > > be
> > > > > > > > > >>>>>> much shorter than the TCP session timeout.  In general
> > > > the TCP
> > > > > > > > session
> > > > > > > > > >>>>>> timeout is common to all TCP connections, and very
> > long.
> > > > To
> > > > > > make
> > > > > > > > these
> > > > > > > > > >>>>>> numbers a little more concrete, the TCP session
> > timeout is
> > > > > > often
> > > > > > > > > >>>>>> configured to be 2 hours on Linux.  (See
> > > > > > > > > >>>>>> https://www.cyberciti.biz/tips/linux-increasing-or-
> > > > > > > > decreasing-tcp-sockets-timeouts.html
> > > > > > > > > >>>>>> )  The timeout I was proposing for incremental fetch
> > > > sessions
> > > > > > was
> > > > > > > > one or
> > > > > > > > > >>>>>> two minutes at most.
> > > > > > > > > >>>>> Currently this is taken care of by
> > > > > > > > > >>>>> connections.max.idle.ms on the broker and defaults to
> > > > > > something
> > > > > > > > of few
> > > > > > > > > >>>>> minutes.
> > > > > > > > > >>>> It is 10 minutes by default, which is longer than what
> > we
> > > > want
> > > > > > the
> > > > > > > > > >>>> incremental fetch session timeout to be.  There's no
> > reason
> > > > to
> > > > > > > > couple
> > > > > > > > > >>>> these two things.
> > > > > > > > > >>>>
> > > > > > > > > >>>>> Also something we could let the client change if we
> > really
> > > > > > wanted
> > > > > > > > to.
> > > > > > > > > >>>>> So there is no need to worry about coupling our
> > > > implementation
> > > > > > to
> > > > > > > > some
> > > > > > > > > >>>>> timeouts given by the OS, with TCP one always has full
> > > > control
> > > > > > > > over the worst
> > > > > > > > > >>>>> times + one gets the extra shot cleaning up early when
> > the
> > > > > > close
> > > > > > > > comes through.
> > > > > > > > > >>>>> Which is the majority of the cases.
> > > > > > > > > >>>> In the majority of cases, the TCP session will be
> > > > > > re-established.
> > > > > > > > In
> > > > > > > > > >>>> that case, we have to send a full fetch request rather
> > than
> > > > an
> > > > > > > > > >>>> incremental fetch request.
> > > > > > > > > >>> I actually have a hard time believing this. Do you have
> > any
> > > > > > numbers
> > > > > > > > of
> > > > > > > > > >>> any existing production system? Is it the virtualisation
> > > > layer
> > > > > > > > cutting
> > > > > > > > > >>> all the connections?
> > > > > > > > > >>> We see this only on application crashes and restarts
> > where
> > > > the
> > > > > > app
> > > > > > > > needs
> > > > > > > > > >>> todo the full anyways
> > > > > > > > > >>> as it probably continues with stores offsets.
> > > > > > > > > >> Yes, TCP connections get dropped.  It happens very often
> > in
> > > > > > production
> > > > > > > > > >> clusters, actually.  When I was working on Hadoop, one of
> > the
> > > > most
> > > > > > > > > >> common questions I heard from newcomers was "why do I see
> > so
> > > > many
> > > > > > > > > >> EOFException messages in the logs"?  The other thing that
> > > > happens
> > > > > > a
> > > > > > > > lot
> > > > > > > > > >> is DNS outages or slowness.  Public clouds seem to have
> > even
> > > > more
> > > > > > > > > >> unstable networks than the on-premise clusters.  I am not
> > > > sure why
> > > > > > > > that
> > > > > > > > > >> is.
> > > > > > > > > Hadoop has a wiki page on exactly this
> > > > > > > > > https://wiki.apache.org/hadoop/EOFException
> > > > > > > > >
> > > > > > > > > besides user errors they have servers crashing and actually
> > loss
> > > > of
> > > > > > > > > connection high on their list.
> > > > > > > > > In the case of "server goes away" the cache goes with it. So
> > > > nothing
> > > > > > to
> > > > > > > > > argue about the cache beeing reused by
> > > > > > > > > a new connection.
> > > > > > > > >
> > > > > > > > > Can you make an argument at which point the epoch would be
> > > > updated
> > > > > > > > > broker side to maximise re-usage of the cache on
> > > > > > > > > lost connections. In many cases the epoch would go out of
> > sync
> > > > and we
> > > > > > > > > would need a full fetch anyways. Am I mistaken here?
> > > > > > > >
> > > > > > > > The current proposal is that the server can accept multiple
> > > > requests
> > > > > > in a
> > > > > > > > row with the same sequence number.
> > > > > > > >
> > > > > > > > Colin
> > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >>
> > > > > > > > > >>>>>>>> Secondly, from a software engineering perspective,
> > it's
> > > > not
> > > > > > a
> > > > > > > > good idea
> > > > > > > > > >>>>>>>> to try to tightly tie together TCP and our code.  We
> > > > would
> > > > > > have
> > > > > > > > to
> > > > > > > > > >>>>>>>> rework how we interact with NetworkClient so that
> > we are
> > > > > > aware
> > > > > > > > of things
> > > > > > > > > >>>>>>>> like TCP sessions closing or opening.  We would
> > have to
> > > > be
> > > > > > > > careful
> > > > > > > > > >>>>>>>> preserve the ordering of incoming messages when
> > doing
> > > > things
> > > > > > > > like
> > > > > > > > > >>>>>>>> putting incoming requests on to a queue to be
> > processed
> > > > by
> > > > > > > > multiple
> > > > > > > > > >>>>>>>> threads.  It's just a lot of complexity to add, and
> > > > there's
> > > > > > no
> > > > > > > > upside.
> > > > > > > > > >>>>>>> I see the point here. And I had a small chat with
> > Dong
> > > > Lin
> > > > > > > > already
> > > > > > > > > >>>>>>> making me aware of this. I tried out the approaches
> > and
> > > > > > propose
> > > > > > > > the
> > > > > > > > > >>>>>>> following:
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> The client start and does a full fetch. It then does
> > > > > > incremental
> > > > > > > > fetches.
> > > > > > > > > >>>>>>> The connection to the broker dies and is
> > re-established
> > > > by
> > > > > > > > NetworkClient
> > > > > > > > > >>>>>>> under the hood.
> > > > > > > > > >>>>>>> The broker sees an incremental fetch without having
> > > > state =>
> > > > > > > > returns
> > > > > > > > > >>>>>>> error:
> > > > > > > > > >>>>>>> Client sees the error, does a full fetch and goes
> > back to
> > > > > > > > incrementally
> > > > > > > > > >>>>>>> fetching.
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> having this 1 additional error round trip is
> > essentially
> > > > the
> > > > > > > > same as
> > > > > > > > > >>>>>>> when something
> > > > > > > > > >>>>>>> with the sessions or epoch changed unexpectedly to
> > the
> > > > client
> > > > > > > > (say
> > > > > > > > > >>>>>>> expiry).
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> So its nothing extra added but the conditions are
> > easier
> > > > to
> > > > > > > > evaluate.
> > > > > > > > > >>>>>>> Especially since we do everything with NetworkClient.
> > > > Other
> > > > > > > > implementers
> > > > > > > > > >>>>>>> on the
> > > > > > > > > >>>>>>> protocol are free to optimizes this and do not do the
> > > > > > errornours
> > > > > > > > > >>>>>>> roundtrip on the
> > > > > > > > > >>>>>>> new connection.
> > > > > > > > > >>>>>>> Its a great plus that the client can know when the
> > error
> > > > is
> > > > > > gonna
> > > > > > > > > >>>>>>> happen. instead of
> > > > > > > > > >>>>>>> the server to always have to report back if something
> > > > changes
> > > > > > > > > >>>>>>> unexpectedly for the client
> > > > > > > > > >>>>>> You are assuming that the leader and the follower
> > agree
> > > > that
> > > > > > the
> > > > > > > > TCP
> > > > > > > > > >>>>>> session drops at the same time.  When there are
> > network
> > > > > > problems,
> > > > > > > > this
> > > > > > > > > >>>>>> may not be true.  The leader may still think the
> > previous
> > > > TCP
> > > > > > > > session is
> > > > > > > > > >>>>>> active.  In that case, we have to keep the incremental
> > > > fetch
> > > > > > > > session
> > > > > > > > > >>>>>> state around until we learn otherwise (which could be
> > up
> > > > to
> > > > > > that
> > > > > > > > 2 hour
> > > > > > > > > >>>>>> timeout I mentioned).  And if we get a new incoming
> > > > > > incremental
> > > > > > > > fetch
> > > > > > > > > >>>>>> request, we can't assume that it replaces the previous
> > > > one,
> > > > > > > > because the
> > > > > > > > > >>>>>> IDs will be different (the new one starts a new
> > session).
> > > > > > > > > >>>>> As mentioned, no reason to fear some time-outs out of
> > our
> > > > > > control
> > > > > > > > > >>>>>>>> Imagine that I made an argument that client IDs are
> > > > > > "complex"
> > > > > > > > and should
> > > > > > > > > >>>>>>>> be removed from our APIs.  After all, we can just
> > look
> > > > at
> > > > > > the
> > > > > > > > remote IP
> > > > > > > > > >>>>>>>> address and TCP port of each connection.  Would you
> > > > think
> > > > > > that
> > > > > > > > was a
> > > > > > > > > >>>>>>>> good idea?  The client ID is useful when looking at
> > > > logs.
> > > > > > For
> > > > > > > > example,
> > > > > > > > > >>>>>>>> if a rebalance is having problems, you want to know
> > what
> > > > > > > > clients were
> > > > > > > > > >>>>>>>> having a problem.  So having the client ID field to
> > > > guide
> > > > > > you is
> > > > > > > > > >>>>>>>> actually much less "complex" in practice than not
> > > > having an
> > > > > > ID.
> > > > > > > > > >>>>>>> I still cant follow why the correlation idea will not
> > > > help
> > > > > > here.
> > > > > > > > > >>>>>>> Correlating logs with it usually works great. Even
> > with
> > > > > > > > primitive tools
> > > > > > > > > >>>>>>> like grep
> > > > > > > > > >>>>>> The correlation ID does help somewhat, but certainly
> > not
> > > > as
> > > > > > much
> > > > > > > > as a
> > > > > > > > > >>>>>> unique 64-bit ID.  The correlation ID is not unique
> > in the
> > > > > > > > broker, just
> > > > > > > > > >>>>>> unique to a single NetworkClient.  Simiarly, the
> > > > correlation
> > > > > > ID
> > > > > > > > is not
> > > > > > > > > >>>>>> unique on the client side, if there are multiple
> > > > Consumers,
> > > > > > etc.
> > > > > > > > > >>>>> Can always bump entropy in correlation IDs, never had a
> > > > problem
> > > > > > > > > >>>>> of finding to many duplicates. Would be a different KIP
> > > > though.
> > > > > > > > > >>>>>>>> Similarly, if metadata responses had epoch numbers
> > > > (simple
> > > > > > > > incrementing
> > > > > > > > > >>>>>>>> numbers), we would not have to debug problems like
> > > > clients
> > > > > > > > accidentally
> > > > > > > > > >>>>>>>> getting old metadata from servers that had been
> > > > partitioned
> > > > > > off
> > > > > > > > from the
> > > > > > > > > >>>>>>>> network for a while.  Clients would know the
> > difference
> > > > > > between
> > > > > > > > old and
> > > > > > > > > >>>>>>>> new metadata.  So putting epochs in to the metadata
> > > > request
> > > > > > is
> > > > > > > > much less
> > > > > > > > > >>>>>>>> "complex" operationally, even though it's an extra
> > > > field in
> > > > > > the
> > > > > > > > request.
> > > > > > > > > >>>>>>>>      This has been discussed before on the mailing
> > list.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> So I think the bottom line for me is that having the
> > > > > > session ID
> > > > > > > > and
> > > > > > > > > >>>>>>>> session epoch, while it adds two extra fields,
> > reduces
> > > > > > > > operational
> > > > > > > > > >>>>>>>> complexity and increases debuggability.  It avoids
> > > > tightly
> > > > > > > > coupling us
> > > > > > > > > >>>>>>>> to assumptions about reliable ordered delivery which
> > > > tend
> > > > > > to be
> > > > > > > > violated
> > > > > > > > > >>>>>>>> in practice in multiple layers of the stack.
> > Finally,
> > > > it
> > > > > > > > avoids the
> > > > > > > > > >>>>>>>> necessity of refactoring NetworkClient.
> > > > > > > > > >>>>>>> So there is stacks out there that violate TCP
> > > > guarantees? And
> > > > > > > > software
> > > > > > > > > >>>>>>> still works? How can this be? Can you elaborate a
> > little
> > > > > > where
> > > > > > > > this
> > > > > > > > > >>>>>>> can be violated? I am not very familiar with
> > virtualized
> > > > > > > > environments
> > > > > > > > > >>>>>>> but they can't really violate TCP contracts.
> > > > > > > > > >>>>>> TCP's guarantees of reliable, in-order transmission
> > > > certainly
> > > > > > can
> > > > > > > > be
> > > > > > > > > >>>>>> violated.  For example, I once had to debug a cluster
> > > > where a
> > > > > > > > certain
> > > > > > > > > >>>>>> node had a network card which corrupted its
> > transmissions
> > > > > > > > occasionally.
> > > > > > > > > >>>>>> With all the layers of checksums, you would think that
> > > > this
> > > > > > was
> > > > > > > > not
> > > > > > > > > >>>>>> possible, but it happened.  We occasionally got
> > corrupted
> > > > data
> > > > > > > > written
> > > > > > > > > >>>>>> to disk on the other end because of it.  Even more
> > > > > > frustrating,
> > > > > > > > the data
> > > > > > > > > >>>>>> was not corrupted on disk on the sending node-- it
> > was a
> > > > bug
> > > > > > in
> > > > > > > > the
> > > > > > > > > >>>>>> network card driver that was injecting the errors.
> > > > > > > > > >>>>> true, but your broker might aswell read a corrupted
> > 600GB
> > > > as
> > > > > > size
> > > > > > > > from
> > > > > > > > > >>>>> the network and die with OOM instantly.
> > > > > > > > > >>>> If you read 600 GB as the size from the network, you
> > will
> > > > not
> > > > > > "die
> > > > > > > > with
> > > > > > > > > >>>> OOM instantly."  That would be a bug.  Instead, you will
> > > > notice
> > > > > > > > that 600
> > > > > > > > > >>>> GB is greater than max.message.bytes, and close the
> > > > connection.
> > > > > > > > > >>> We only check max.message.bytes to late to guard against
> > > > consumer
> > > > > > > > > >>> stalling.
> > > > > > > > > >>> we dont have a notion of max.networkpacket.size before we
> > > > > > allocate
> > > > > > > > the
> > > > > > > > > >>> bytebuffer to read it into.
> > > > > > > > > >> "network packets" are not the same thing as "kafka
> > RPCs."  One
> > > > > > Kafka
> > > > > > > > RPC
> > > > > > > > > >> could take up mutiple ethernet packets.
> > > > > > > > > >>
> > > > > > > > > >> Also, max.message.bytes has nothing to do with "consumer
> > > > > > stalling" --
> > > > > > > > > >> you are probably thinking about some of the fetch request
> > > > > > > > > >> configurations.  max.message.bytes is used by the RPC
> > system
> > > > to
> > > > > > figure
> > > > > > > > > >> out whether to read the full incoming RP
> > > > > > > > > > Whoops, this is incorrect.  I was thinking about
> > > > > > > > > > "socket.request.max.bytes" rather than "max.message.bytes."
> > > > Sorry
> > > > > > > > about
> > > > > > > > > > that.  See Ismael's email as well.
> > > > > > > > > >
> > > > > > > > > > best,
> > > > > > > > > > Colin
> > > > > > > > > >
> > > > > > > > > >> best,
> > > > > > > > > >> Colin
> > > > > > > > > >>
> > > > > > > > > >>>>> Optimizing for still having functional
> > > > > > > > > >>>>> software under this circumstances is not reasonable.
> > > > > > > > > >>>>> You want to get rid of such a
> > > > > > > > > >>>>> node ASAP and pray that zookeepers ticks get corrupted
> > > > often
> > > > > > enough
> > > > > > > > > >>>>> that it finally drops out of the cluster.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> There is a good reason that these kinda things
> > > > > > > > > >>>>> https://issues.apache.org/jira/browse/MESOS-4105
> > > > > > > > > >>>>> don't end up as kafka Jiras. In the end you can't run
> > any
> > > > > > software
> > > > > > > > in
> > > > > > > > > >>>>> these containers anymore. Application layer checksums
> > are a
> > > > > > neat
> > > > > > > > thing to
> > > > > > > > > >>>>> fail fast but trying to cope with this probably causes
> > > > more bad
> > > > > > > > than
> > > > > > > > > >>>>> good.  So I would argue that we shouldn't try this for
> > the
> > > > > > fetch
> > > > > > > > requests.
> > > > > > > > > >>>> One of the goals of Apache Kafka is to be "a streaming
> > > > > > platform...
> > > > > > > > > >>>> [that] lets you store streams of records in a
> > fault-tolerant
> > > > > > way."
> > > > > > > > For
> > > > > > > > > >>>> more information, see https://kafka.apache.org/intro .
> > > > > > > > Fault-tolerance
> > > > > > > > > >>>> is explicitly part of the goal of Kafka.  Prayer should
> > be
> > > > > > > > optional, not
> > > > > > > > > >>>> required, when running the software.
> > > > > > > > > >>> Yes, we need to fail ASAP when we read corrupted
> > packages. It
> > > > > > seemed
> > > > > > > > > >>> to me like you tried to make the case for pray and try to
> > > > stay
> > > > > > alive.
> > > > > > > > > >>> Fault
> > > > > > > > > >>> tolerance here means. I am a fishy box i am going to let
> > a
> > > > good
> > > > > > box
> > > > > > > > > >>> handle
> > > > > > > > > >>> it and be silent until i get fixed up.
> > > > > > > > > >>>> Crashing because someone sent you a bad packet is not
> > > > reasonable
> > > > > > > > > >>>> behavior.  It is a bug.  Similarly, bringing down the
> > whole
> > > > > > cluster,
> > > > > > > > > >>>> which could a hundred nodes, because someone had a bad
> > > > network
> > > > > > > > adapter
> > > > > > > > > >>>> is not reasonable behavior.  It is perhaps reasonable
> > for
> > > > the
> > > > > > > > cluster to
> > > > > > > > > >>>> perform worse when hardware is having problems.  But
> > that's
> > > > a
> > > > > > > > different
> > > > > > > > > >>>> discussion.
> > > > > > > > > >>> See above.
> > > > > > > > > >>>> best,
> > > > > > > > > >>>> Colin
> > > > > > > > > >>>>
> > > > > > > > > >>>>>> However, my point was not about TCP's guarantees being
> > > > > > violated.
> > > > > > > > My
> > > > > > > > > >>>>>> point is that TCP's guarantees are only one small
> > building
> > > > > > block
> > > > > > > > to
> > > > > > > > > >>>>>> build a robust distributed system.  TCP basically just
> > > > says
> > > > > > that
> > > > > > > > if you
> > > > > > > > > >>>>>> get any bytes from the stream, you will get the ones
> > that
> > > > were
> > > > > > > > sent by
> > > > > > > > > >>>>>> the sender, in the order they were sent.  TCP does not
> > > > > > guarantee
> > > > > > > > that
> > > > > > > > > >>>>>> the bytes you send will get there.  It does not
> > guarantee
> > > > > > that if
> > > > > > > > you
> > > > > > > > > >>>>>> close the connection, the other end will know about
> > it in
> > > > a
> > > > > > timely
> > > > > > > > > >>>>>> fashion.
> > > > > > > > > >>>>> These are very powerful grantees and since we use TCP
> > we
> > > > should
> > > > > > > > > >>>>> piggy pack everything that is reasonable on to it. IMO
> > > > there
> > > > > > is no
> > > > > > > > > >>>>> need to reimplement correct sequencing again if you get
> > > > that
> > > > > > from
> > > > > > > > > >>>>> your transport layer. It saves you the complexity, it
> > makes
> > > > > > > > > >>>>> you application behave way more naturally and your api
> > > > easier
> > > > > > to
> > > > > > > > > >>>>> understand.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> There is literally nothing the Kernel wont let you
> > decide
> > > > > > > > > >>>>> especially not any timings. Only noticeable exception
> > being
> > > > > > > > TIME_WAIT
> > > > > > > > > >>>>> of usually 240 seconds but that already has little todo
> > > > with
> > > > > > the
> > > > > > > > broker
> > > > > > > > > >>>>> itself and
> > > > > > > > > >>>>> if we are running out of usable ports because of this
> > then
> > > > > > expiring
> > > > > > > > > >>>>> fetch requests
> > > > > > > > > >>>>> wont help much anyways.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> I hope I could strengthen the trust you have in
> > userland
> > > > TCP
> > > > > > > > connection
> > > > > > > > > >>>>> management. It is really powerful and can be exploited
> > for
> > > > > > maximum
> > > > > > > > gains
> > > > > > > > > >>>>> without much risk in my opinion.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>> It does not guarantee that the bytes will be received
> > in a
> > > > > > > > > >>>>>> certain timeframe, and certainly doesn't guarantee
> > that
> > > > if you
> > > > > > > > send a
> > > > > > > > > >>>>>> byte on connection X and then on connection Y, that
> > the
> > > > remote
> > > > > > > > end will
> > > > > > > > > >>>>>> read a byte on X before reading a byte on Y.
> > > > > > > > > >>>>> Noone expects this from two independent paths of any
> > kind.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>> best,
> > > > > > > > > >>>>>> Colin
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>> Hope this made my view clearer, especially the first
> > > > part.
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> Best Jan
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>>> best,
> > > > > > > > > >>>>>>>> Colin
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>> If there is an error such as NotLeaderForPartition
> > is
> > > > > > > > > >>>>>>>>> returned for some partitions, the follower can
> > always
> > > > send
> > > > > > a
> > > > > > > > full
> > > > > > > > > >>>>>>>>> FetchRequest. Is there a scenario that only some
> > of the
> > > > > > > > partitions in a
> > > > > > > > > >>>>>>>>> FetchResponse is lost?
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> Thanks,
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> Jiangjie (Becket) Qin
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> > > > > > > > cmccabe@apache.org>  wrote:
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > > > > > > > > >>>>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<
> > > > > > > > cmccabe@apache.org>
> > > > > > > > > >>>>>>>>>> wrote:
> > > > > > > > > >>>>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > > > > > > > >>>>>>>>>>>>> Hey Colin,
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> Thanks much for the update. I have a few
> > questions
> > > > > > below:
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> 1. I am not very sure that we need Fetch
> > Session
> > > > > > Epoch. It
> > > > > > > > seems that
> > > > > > > > > >>>>>>>>>>>>> Fetch
> > > > > > > > > >>>>>>>>>>>>> Session Epoch is only needed to help leader
> > > > distinguish
> > > > > > > > between "a
> > > > > > > > > >>>>>>>>>> full
> > > > > > > > > >>>>>>>>>>>>> fetch request" and "a full fetch request and
> > > > request a
> > > > > > new
> > > > > > > > > >>>>>>>>>> incremental
> > > > > > > > > >>>>>>>>>>>>> fetch session". Alternatively, follower can
> > also
> > > > > > indicate
> > > > > > > > "a full
> > > > > > > > > >>>>>>>>>> fetch
> > > > > > > > > >>>>>>>>>>>>> request and request a new incremental fetch
> > > > session" by
> > > > > > > > setting Fetch
> > > > > > > > > >>>>>>>>>>>>> Session ID to -1 without using Fetch Session
> > Epoch.
> > > > > > Does
> > > > > > > > this make
> > > > > > > > > >>>>>>>>>> sense?
> > > > > > > > > >>>>>>>>>>>> Hi Dong,
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> The fetch session epoch is very important for
> > > > ensuring
> > > > > > > > correctness.  It
> > > > > > > > > >>>>>>>>>>>> prevents corrupted or incomplete fetch data due
> > to
> > > > > > network
> > > > > > > > reordering
> > > > > > > > > >>>>>>>>>> or
> > > > > > > > > >>>>>>>>>>>> loss.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> For example, consider a scenario where the
> > follower
> > > > > > sends a
> > > > > > > > fetch
> > > > > > > > > >>>>>>>>>>>> request to the leader.  The leader responds,
> > but the
> > > > > > > > response is lost
> > > > > > > > > >>>>>>>>>>>> because of network problems which affected the
> > TCP
> > > > > > > > session.  In that
> > > > > > > > > >>>>>>>>>>>> case, the follower must establish a new TCP
> > session
> > > > and
> > > > > > > > re-send the
> > > > > > > > > >>>>>>>>>>>> incremental fetch request.  But the leader does
> > not
> > > > know
> > > > > > > > that the
> > > > > > > > > >>>>>>>>>>>> follower didn't receive the previous incremental
> > > > fetch
> > > > > > > > response.  It is
> > > > > > > > > >>>>>>>>>>>> only the incremental fetch epoch which lets the
> > > > leader
> > > > > > know
> > > > > > > > that it
> > > > > > > > > >>>>>>>>>>>> needs to resend that data, and not data which
> > comes
> > > > > > > > afterwards.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> You could construct similar scenarios with
> > message
> > > > > > > > reordering,
> > > > > > > > > >>>>>>>>>>>> duplication, etc.  Basically, this is a stateful
> > > > > > protocol
> > > > > > > > on an
> > > > > > > > > >>>>>>>>>>>> unreliable network, and you need to know
> > whether the
> > > > > > > > follower got the
> > > > > > > > > >>>>>>>>>>>> previous data you sent before you move on.  And
> > you
> > > > > > need to
> > > > > > > > handle
> > > > > > > > > >>>>>>>>>>>> issues like duplicated or delayed requests.
> > These
> > > > > > issues
> > > > > > > > do not affect
> > > > > > > > > >>>>>>>>>>>> the full fetch request, because it is not
> > > > stateful-- any
> > > > > > > > full fetch
> > > > > > > > > >>>>>>>>>>>> request can be understood and properly
> > responded to
> > > > in
> > > > > > > > isolation.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Thanks for the explanation. This makes sense. On
> > the
> > > > > > other
> > > > > > > > hand I would
> > > > > > > > > >>>>>>>>>>> be interested in learning more about whether
> > Becket's
> > > > > > > > solution can help
> > > > > > > > > >>>>>>>>>>> simplify the protocol by not having the echo
> > field
> > > > and
> > > > > > > > whether that is
> > > > > > > > > >>>>>>>>>>> worth doing.
> > > > > > > > > >>>>>>>>>> Hi Dong,
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> I commented about this in the other thread.  A
> > > > solution
> > > > > > which
> > > > > > > > doesn't
> > > > > > > > > >>>>>>>>>> maintain session information doesn't work here.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> 2. It is said that Incremental FetchRequest
> > will
> > > > > > include
> > > > > > > > partitions
> > > > > > > > > >>>>>>>>>> whose
> > > > > > > > > >>>>>>>>>>>>> fetch offset or maximum number of fetch bytes
> > has
> > > > been
> > > > > > > > changed. If
> > > > > > > > > >>>>>>>>>>>>> follower's logStartOffet of a partition has
> > > > changed,
> > > > > > > > should this
> > > > > > > > > >>>>>>>>>>>>> partition also be included in the next
> > > > FetchRequest to
> > > > > > the
> > > > > > > > leader?
> > > > > > > > > >>>>>>>>>>>> Otherwise, it
> > > > > > > > > >>>>>>>>>>>>> may affect the handling of DeleteRecordsRequest
> > > > because
> > > > > > > > leader may
> > > > > > > > > >>>>>>>>>> not
> > > > > > > > > >>>>>>>>>>>> know
> > > > > > > > > >>>>>>>>>>>>> the corresponding data has been deleted on the
> > > > > > follower.
> > > > > > > > > >>>>>>>>>>>> Yeah, the follower should include the partition
> > if
> > > > the
> > > > > > > > logStartOffset
> > > > > > > > > >>>>>>>>>>>> has changed.  That should be spelled out on the
> > KIP.
> > > > > > Fixed.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> 3. In the section "Per-Partition Data", a
> > > > partition is
> > > > > > not
> > > > > > > > considered
> > > > > > > > > >>>>>>>>>>>>> dirty if its log start offset has changed.
> > Later
> > > > in the
> > > > > > > > section
> > > > > > > > > >>>>>>>>>>>> "FetchRequest
> > > > > > > > > >>>>>>>>>>>>> Changes", it is said that incremental fetch
> > > > responses
> > > > > > will
> > > > > > > > include a
> > > > > > > > > >>>>>>>>>>>>> partition if its logStartOffset has changed. It
> > > > seems
> > > > > > > > inconsistent.
> > > > > > > > > >>>>>>>>>> Can
> > > > > > > > > >>>>>>>>>>>>> you update the KIP to clarify it?
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> In the "Per-Partition Data" section, it does say
> > > > that
> > > > > > > > logStartOffset
> > > > > > > > > >>>>>>>>>>>> changes make a partition dirty, though, right?
> > The
> > > > > > first
> > > > > > > > bullet point
> > > > > > > > > >>>>>>>>>>>> is:
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> * The LogCleaner deletes messages, and this
> > > > changes the
> > > > > > > > log start
> > > > > > > > > >>>>>>>>>> offset
> > > > > > > > > >>>>>>>>>>>> of the partition on the leader., or
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Ah I see. I think I didn't notice this because
> > > > statement
> > > > > > > > assumes that the
> > > > > > > > > >>>>>>>>>>> LogStartOffset in the leader only changes due to
> > > > > > LogCleaner.
> > > > > > > > In fact the
> > > > > > > > > >>>>>>>>>>> LogStartOffset can change on the leader due to
> > > > either log
> > > > > > > > retention and
> > > > > > > > > >>>>>>>>>>> DeleteRecordsRequest. I haven't verified whether
> > > > > > LogCleaner
> > > > > > > > can change
> > > > > > > > > >>>>>>>>>>> LogStartOffset though. It may be a bit better to
> > > > just say
> > > > > > > > that a
> > > > > > > > > >>>>>>>>>>> partition is considered dirty if LogStartOffset
> > > > changes.
> > > > > > > > > >>>>>>>>>> I agree.  It should be straightforward to just
> > resend
> > > > the
> > > > > > > > partition if
> > > > > > > > > >>>>>>>>>> logStartOffset changes.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> 4. In "Fetch Session Caching" section, it is
> > said
> > > > that
> > > > > > > > each broker
> > > > > > > > > >>>>>>>>>> has a
> > > > > > > > > >>>>>>>>>>>>> limited number of slots. How is this number
> > > > determined?
> > > > > > > > Does this
> > > > > > > > > >>>>>>>>>> require
> > > > > > > > > >>>>>>>>>>>>> a new broker config for this number?
> > > > > > > > > >>>>>>>>>>>> Good point.  I added two broker configuration
> > > > > > parameters to
> > > > > > > > control
> > > > > > > > > >>>>>>>>>> this
> > > > > > > > > >>>>>>>>>>>> number.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> I am curious to see whether we can avoid some of
> > > > these
> > > > > > new
> > > > > > > > configs. For
> > > > > > > > > >>>>>>>>>>> example, incremental.fetch.session.
> > > > > > cache.slots.per.broker
> > > > > > > > is probably
> > > > > > > > > >>>>>>>>>> not
> > > > > > > > > >>>>>>>>>>> necessary because if a leader knows that a
> > > > FetchRequest
> > > > > > > > comes from a
> > > > > > > > > >>>>>>>>>>> follower, we probably want the leader to always
> > > > cache the
> > > > > > > > information
> > > > > > > > > >>>>>>>>>>> from that follower. Does this make sense?
> > > > > > > > > >>>>>>>>>> Yeah, maybe we can avoid having
> > > > > > > > > >>>>>>>>>> incremental.fetch.session.cache.slots.per.broker.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Maybe we can discuss the config later after
> > there is
> > > > > > > > agreement on how the
> > > > > > > > > >>>>>>>>>>> protocol would look like.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> What is the error code if broker does
> > > > > > > > > >>>>>>>>>>>>> not have new log for the incoming FetchRequest?
> > > > > > > > > >>>>>>>>>>>> Hmm, is there a typo in this question?  Maybe
> > you
> > > > meant
> > > > > > to
> > > > > > > > ask what
> > > > > > > > > >>>>>>>>>>>> happens if there is no new cache slot for the
> > > > incoming
> > > > > > > > FetchRequest?
> > > > > > > > > >>>>>>>>>>>> That's not an error-- the incremental fetch
> > session
> > > > ID
> > > > > > just
> > > > > > > > gets set to
> > > > > > > > > >>>>>>>>>>>> 0, indicating no incremental fetch session was
> > > > created.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Yeah there is a typo. You have answered my
> > question.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> 5. Can you clarify what happens if follower
> > adds a
> > > > > > > > partition to the
> > > > > > > > > >>>>>>>>>>>>> ReplicaFetcherThread after receiving
> > > > > > LeaderAndIsrRequest?
> > > > > > > > Does leader
> > > > > > > > > >>>>>>>>>>>>> needs to generate a new session for this
> > > > > > > > ReplicaFetcherThread or
> > > > > > > > > >>>>>>>>>> does it
> > > > > > > > > >>>>>>>>>>>> re-use
> > > > > > > > > >>>>>>>>>>>>> the existing session?  If it uses a new
> > session,
> > > > is the
> > > > > > > > old session
> > > > > > > > > >>>>>>>>>>>>> actively deleted from the slot?
> > > > > > > > > >>>>>>>>>>>> The basic idea is that you can't make changes,
> > > > except by
> > > > > > > > sending a full
> > > > > > > > > >>>>>>>>>>>> fetch request.  However, perhaps we can allow
> > the
> > > > > > client to
> > > > > > > > re-use its
> > > > > > > > > >>>>>>>>>>>> existing session ID.  If the client sets
> > sessionId
> > > > = id,
> > > > > > > > epoch = 0, it
> > > > > > > > > >>>>>>>>>>>> could re-initialize the session.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Yeah I agree with the basic idea. We probably
> > want to
> > > > > > > > understand more
> > > > > > > > > >>>>>>>>>>> detail about how this works later.
> > > > > > > > > >>>>>>>>>> Sounds good.  I updated the KIP with this
> > > > information.  A
> > > > > > > > > >>>>>>>>>> re-initialization should be exactly the same as an
> > > > > > > > initialization,
> > > > > > > > > >>>>>>>>>> except that it reuses an existing ID.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> best,
> > > > > > > > > >>>>>>>>>> Colin
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> BTW, I think it may be useful if the KIP can
> > > > include
> > > > > > the
> > > > > > > > example
> > > > > > > > > >>>>>>>>>> workflow
> > > > > > > > > >>>>>>>>>>>>> of how this feature will be used in case of
> > > > partition
> > > > > > > > change and so
> > > > > > > > > >>>>>>>>>> on.
> > > > > > > > > >>>>>>>>>>>> Yeah, that might help.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> best,
> > > > > > > > > >>>>>>>>>>>> Colin
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> Thanks,
> > > > > > > > > >>>>>>>>>>>>> Dong
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<
> > > > > > > > cmccabe@apache.org>
> > > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> I updated the KIP with the ideas we've been
> > > > > > discussing.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> best,
> > > > > > > > > >>>>>>>>>>>>>> Colin
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe
> > > > wrote:
> > > > > > > > > >>>>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak
> > > > wrote:
> > > > > > > > > >>>>>>>>>>>>>>>> Hi Colin, thank you  for this KIP, it can
> > > > become a
> > > > > > > > really
> > > > > > > > > >>>>>>>>>> useful
> > > > > > > > > >>>>>>>>>>>> thing.
> > > > > > > > > >>>>>>>>>>>>>>>> I just scanned through the discussion so
> > far and
> > > > > > wanted
> > > > > > > > to
> > > > > > > > > >>>>>>>>>> start a
> > > > > > > > > >>>>>>>>>>>>>>>> thread to make as decision about keeping the
> > > > > > > > > >>>>>>>>>>>>>>>> cache with the Connection / Session or
> > having
> > > > some
> > > > > > sort
> > > > > > > > of UUID
> > > > > > > > > >>>>>>>>>>>> indN
> > > > > > > > > >>>>>>>>>>>>>> exed
> > > > > > > > > >>>>>>>>>>>>>>>> global Map.
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Sorry if that has been settled already and I
> > > > missed
> > > > > > it.
> > > > > > > > In this
> > > > > > > > > >>>>>>>>>>>> case
> > > > > > > > > >>>>>>>>>>>>>>>> could anyone point me to the discussion?
> > > > > > > > > >>>>>>>>>>>>>>> Hi Jan,
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> I don't think anyone has discussed the idea
> > of
> > > > tying
> > > > > > the
> > > > > > > > cache
> > > > > > > > > >>>>>>>>>> to an
> > > > > > > > > >>>>>>>>>>>>>>> individual TCP session yet.  I agree that
> > since
> > > > the
> > > > > > > > cache is
> > > > > > > > > >>>>>>>>>>>> intended to
> > > > > > > > > >>>>>>>>>>>>>>> be used only by a single follower or client,
> > > > it's an
> > > > > > > > interesting
> > > > > > > > > >>>>>>>>>>>> thing
> > > > > > > > > >>>>>>>>>>>>>>> to think about.
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> I guess the obvious disadvantage is that
> > whenever
> > > > > > your
> > > > > > > > TCP
> > > > > > > > > >>>>>>>>>> session
> > > > > > > > > >>>>>>>>>>>>>>> drops, you have to make a full fetch request
> > > > rather
> > > > > > than
> > > > > > > > an
> > > > > > > > > >>>>>>>>>>>> incremental
> > > > > > > > > >>>>>>>>>>>>>>> one.  It's not clear to me how often this
> > > > happens in
> > > > > > > > practice --
> > > > > > > > > >>>>>>>>>> it
> > > > > > > > > >>>>>>>>>>>>>>> probably depends a lot on the quality of the
> > > > network.
> > > > > > > > From a
> > > > > > > > > >>>>>>>>>> code
> > > > > > > > > >>>>>>>>>>>>>>> perspective, it might also be a bit
> > difficult to
> > > > > > access
> > > > > > > > data
> > > > > > > > > >>>>>>>>>>>> associated
> > > > > > > > > >>>>>>>>>>>>>>> with the Session from classes like KafkaApis
> > > > > > (although
> > > > > > > > we could
> > > > > > > > > >>>>>>>>>>>> refactor
> > > > > > > > > >>>>>>>>>>>>>>> it to make this easier).
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> It's also clear that even if we tie the
> > cache to
> > > > the
> > > > > > > > session, we
> > > > > > > > > >>>>>>>>>>>> still
> > > > > > > > > >>>>>>>>>>>>>>> have to have limits on the number of caches
> > we're
> > > > > > > > willing to
> > > > > > > > > >>>>>>>>>> create.
> > > > > > > > > >>>>>>>>>>>>>>> And probably we should reserve some cache
> > slots
> > > > for
> > > > > > each
> > > > > > > > > >>>>>>>>>> follower, so
> > > > > > > > > >>>>>>>>>>>>>>> that clients don't take all of them.
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Id rather see a protocol in which the
> > client is
> > > > > > hinting
> > > > > > > > the
> > > > > > > > > >>>>>>>>>> broker
> > > > > > > > > >>>>>>>>>>>>>> that,
> > > > > > > > > >>>>>>>>>>>>>>>> he is going to use the feature instead of a
> > > > client
> > > > > > > > > >>>>>>>>>>>>>>>> realizing that the broker just offered the
> > > > feature
> > > > > > > > (regardless
> > > > > > > > > >>>>>>>>>> of
> > > > > > > > > >>>>>>>>>>>>>>>> protocol version which should only indicate
> > > > that the
> > > > > > > > feature
> > > > > > > > > >>>>>>>>>>>>>>>> would be usable).
> > > > > > > > > >>>>>>>>>>>>>>> Hmm.  I'm not sure what you mean by
> > "hinting."
> > > > I do
> > > > > > > > think that
> > > > > > > > > >>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>>>> server should have the option of not
> > accepting
> > > > > > > > incremental
> > > > > > > > > >>>>>>>>>> requests
> > > > > > > > > >>>>>>>>>>>> from
> > > > > > > > > >>>>>>>>>>>>>>> specific clients, in order to save memory
> > space.
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> This seems to work better with a per
> > > > > > > > > >>>>>>>>>>>>>>>> connection/session attached Metadata than
> > with
> > > > a Map
> > > > > > > > and could
> > > > > > > > > >>>>>>>>>>>> allow
> > > > > > > > > >>>>>>>>>>>>>> for
> > > > > > > > > >>>>>>>>>>>>>>>> easier client implementations.
> > > > > > > > > >>>>>>>>>>>>>>>> It would also make Client-side code easier
> > as
> > > > there
> > > > > > > > wouldn't
> > > > > > > > > >>>>>>>>>> be any
> > > > > > > > > >>>>>>>>>>>>>>>> Cache-miss error Messages to handle.
> > > > > > > > > >>>>>>>>>>>>>>> It is nice not to have to handle cache-miss
> > > > > > responses, I
> > > > > > > > agree.
> > > > > > > > > >>>>>>>>>>>>>>> However, TCP sessions aren't exposed to most
> > of
> > > > our
> > > > > > > > client-side
> > > > > > > > > >>>>>>>>>> code.
> > > > > > > > > >>>>>>>>>>>>>>> For example, when the Producer creates a
> > message
> > > > and
> > > > > > > > hands it
> > > > > > > > > >>>>>>>>>> off to
> > > > > > > > > >>>>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>>>> NetworkClient, the NC will transparently
> > > > re-connect
> > > > > > and
> > > > > > > > re-send a
> > > > > > > > > >>>>>>>>>>>>>>> message if the first send failed.  The
> > > > higher-level
> > > > > > code
> > > > > > > > will
> > > > > > > > > >>>>>>>>>> not be
> > > > > > > > > >>>>>>>>>>>>>>> informed about whether the TCP session was
> > > > > > > > re-established,
> > > > > > > > > >>>>>>>>>> whether an
> > > > > > > > > >>>>>>>>>>>>>>> existing TCP session was used, and so on.  So
> > > > > > overall I
> > > > > > > > would
> > > > > > > > > >>>>>>>>>> still
> > > > > > > > > >>>>>>>>>>>> lean
> > > > > > > > > >>>>>>>>>>>>>>> towards not coupling this to the TCP
> > session...
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> best,
> > > > > > > > > >>>>>>>>>>>>>>> Colin
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>       Thank you again for the KIP. And
> > again, if
> > > > > > this
> > > > > > > > was clarified
> > > > > > > > > >>>>>>>>>>>> already
> > > > > > > > > >>>>>>>>>>>>>>>> please drop me a hint where I could read
> > about
> > > > it.
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Best Jan
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > > > > > > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> I created a KIP to improve the scalability
> > and
> > > > > > latency
> > > > > > > > of
> > > > > > > > > >>>>>>>>>>>>>> FetchRequest:
> > > > > > > > > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > >>>>>>>>>>>>>> 227%3A+Introduce+Incremental+
> > > > > > FetchRequests+to+Increase+
> > > > > > > > > >>>>>>>>>>>>>> Partition+Scalability
> > > > > > > > > >>>>>>>>>>>>>>>>> Please take a look.
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> cheers,
> > > > > > > > > >>>>>>>>>>>>>>>>> Colin
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> >

Mime
View raw message