kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin McCabe <cmcc...@apache.org>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Mon, 11 Dec 2017 23:03:48 GMT
On Mon, Dec 11, 2017, at 13:17, Dong Lin wrote:
> On Thu, Dec 7, 2017 at 1:52 PM, Colin McCabe <cmccabe@apache.org> wrote:
> 
> > On Wed, Dec 6, 2017, at 11:23, Becket Qin wrote:
> > > Hi Colin,
> > >
> > > >A full fetch request will certainly avoid any ambiguity here.  But now
> > > >we're back to sending full fetch requests whenever there are network
> > > >issues, which is worse than the current proposal.  And has the
> > > >congestion collapse problem I talked about earlier when the network is
> > > >wobbling.  We also don't get the other debuggability benefits of being
> > > >able to uniquely associate each update in the incremental fetch session
> > > >with a sequence number.
> > >
> > > I think we would want to optimize for the normal case instead of the
> > > failure case. The failure case is supposed to be rare and if that happens
> > > usually it requires human attention to fix anyways. So reducing the
> > > regular cost in the normal cases probably makes more sense.
> >
> 
> 
> Hmm.. let me chime in and ask a quick question on this.
> 
> My understanding of Becket's proposal is that the FetchRequest will not
> contain per-partition information in the normal cases. According to the
> latest KIP, it is said that "Incremental FetchRequests will only contain
> information about partitions which have changed on the follower". So if
> there is always data available for every partition on the broker, the
> FetchRequest will always contain per-partition information for every
> partition, which makes it essentially a full FetchRequest in normal case.
> Did I miss something here?

Hi Dong,

I think your understanding is correct.  The KIP-227 proposal includes
information about changed partitions in the partition fetch request.  If
every partition has changed, every partition will be included.

I don't think this is a problem.  For one thing, if every partition has
changed, then every partition will have data, which means you will have
a really large FetchResponse.  In that case, most of your network
bandwidth goes to the response anyway, rather than to the request.  And
you cannot get rid of that overhead, because you actually need to fetch
that data.

In any case, I am very skeptical that clusters that have information for
every partition on every fetch request exist in the wild. Remember that,
by default, we return a response to the fetch request when any partition
gets even a single byte of data.  Let's say you have 10,000 partitions. 
Do you have 10,000 produce requests being handled to each partition in
between each fetch request?  All the time?  So that somehow you can
service 10,000 Produce RPCs before you send back the response to a
single pending FetchRequest?  That's not very believable, especially
when you start thinking about internal topics.

I think the only way you could get reasonably close to a true fully
loaded fetch response is if you tuned Kafka for high latency and high
bandwidth.  So you could increase the wait time before sending back any
responses, and increase the minimum response size.  But that's not the
scenario we're addressing here.

best,
Colin

> 
> 
> 
> > >
> > > Thanks,
> >
> > Hi Becket,
> >
> > I agree we should optimize for the normal case.  I believe that the
> > sequence number proposal I put forward does this.  All the competing
> > proposals have been strictly worse for both the normal and error cases.
> > For example, the proposal to rely on the TCP session to establish
> > ordering does not help the normal case.  But it does make the case where
> > there are network issues worse.  It also makes it harder for us to put a
> > limit on the amount of time we will cache, which is worse for the normal
> > case.
> >
> > best,
> > Colin
> >
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Wed, Dec 6, 2017 at 10:58 AM, Colin McCabe <cmccabe@apache.org>
> > wrote:
> > >
> > > > On Wed, Dec 6, 2017, at 10:49, Jason Gustafson wrote:
> > > > > >
> > > > > > There is already a way in the existing proposal for clients to
> > change
> > > > > > the set of partitions they are interested in, while re-using their
> > same
> > > > > > session and session ID.  We don't need to change how sequence ID
> > works
> > > > > > in order to do this.
> > > > >
> > > > >
> > > > > There is some inconsistency in the KIP about this, so I wasn't sure.
> > In
> > > > > particular, you say this: " The FetchSession maintains information
> > about
> > > > > a specific set of relevant partitions.  Note that the set of relevant
> > > > > partitions is established when the FetchSession is created.  It
> > cannot be
> > > > > changed later." Maybe that could be clarified?
> > > >
> > > > That's a fair point-- I didn't fix this part of the KIP after making an
> > > > update below.  So it was definitely unclear.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > >
> > > > > > But how does the broker know that it needs to resend the data for
> > > > > > partition P?  After all, if the response had not been dropped, P
> > would
> > > > > > not have been resent, since it didn't change.  Under the existing
> > > > > > scheme, the follower can look at lastDirtyEpoch to find this out.
> >  In
> > > > > > the new scheme, I don't see how it would know.
> > > > >
> > > > >
> > > > > If a fetch response is lost, the epoch would be bumped by the client
> > and
> > > > > a
> > > > > full fetch would be sent. Doesn't that solve the issue?
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Wed, Dec 6, 2017 at 10:40 AM, Colin McCabe <cmccabe@apache.org>
> > > > wrote:
> > > > >
> > > > > > On Wed, Dec 6, 2017, at 09:32, Jason Gustafson wrote:
> > > > > > > >
> > > > > > > > Thinking about this again. I do see the reason that we want to
> > > > have a
> > > > > > epoch
> > > > > > > > to avoid out of order registration of the interested set. But
> > I am
> > > > > > > > wondering if the following semantic would meet what we want
> > better:
> > > > > > > >  - Session Id: the id assigned to a single client for life long
> > > > time.
> > > > > > i.e
> > > > > > > > it does not change when the interested partitions change.
> > > > > > > >  - Epoch: the interested set epoch. Only updated when a full
> > fetch
> > > > > > request
> > > > > > > > comes, which may result in the interested partition set change.
> > > > > > > > This will ensure that the registered interested set will
> > always be
> > > > the
> > > > > > > > latest registration. And the clients can change the interested
> > > > > > partition
> > > > > > > > set without creating another session.
> > > > > > >
> > > > > > >
> > > > > > > I agree this is a bit more intuitive than the sequence number
> > and the
> > > > > > > ability to reuse the session is beneficial since it causes less
> > > > waste of
> > > > > > > the cache for session timeouts.
> > > > > >
> > > > > > Hi Jason,
> > > > > >
> > > > > > There is already a way in the existing proposal for clients to
> > change
> > > > > > the set of partitions they are interested in, while re-using their
> > same
> > > > > > session and session ID.  We don't need to change how sequence ID
> > works
> > > > > > in order to do this.
> > > > > >
> > > > > > > controlled by the client and a bump of the epoch indicates a full
> > > > fetch
> > > > > > > request. The client should also bump the epoch if it fails to
> > > > receive a
> > > > > > > fetch response. This ensures that the broker cannot receive an
> > old
> > > > > > > request after the client has reconnected and sent a new one which
> > > > > > > could cause an invalid session state.
> > > > > >
> > > > > > Hmm... I don't think this quite works.
> > > > > >
> > > > > > Let's suppose a broker sends out an incremental fetch response
> > > > > > containing new data for some partition P.  The sequence number of
> > the
> > > > > > fetch response is 100.  If the follower loses the response, under
> > this
> > > > > > proposed scheme, the follower bumps up the sequence number up to
> > 101
> > > > and
> > > > > > retries.
> > > > > >
> > > > > > But how does the broker know that it needs to resend the data for
> > > > > > partition P?  After all, if the response had not been dropped, P
> > would
> > > > > > not have been resent, since it didn't change.  Under the existing
> > > > > > scheme, the follower can look at lastDirtyEpoch to find this out.
> >  In
> > > > > > the new scheme, I don't see how it would know.
> > > > > >
> > > > > > In summary, the incremental fetch sequence ID is useful inside the
> > > > > > broker as well as outside it.
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > > >
> > > > > > > -Jason
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Dec 5, 2017 at 9:38 PM, Becket Qin <becket.qin@gmail.com
> > >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > That is true, but in reality it seems rare that the fetch size
> > is
> > > > > > smaller
> > > > > > > > than index interval. In the worst case, we may need to do
> > another
> > > > look
> > > > > > up.
> > > > > > > > In the future, when we have the mechanism to inform the clients
> > > > about
> > > > > > the
> > > > > > > > broker configurations, the clients may want to configure
> > > > > > correspondingly as
> > > > > > > > well, e.g. max message size, max timestamp difference, etc.
> > > > > > > >
> > > > > > > > On the other hand, we are not guaranteeing that the returned
> > bytes
> > > > in a
> > > > > > > > partition is always bounded by the per partition fetch size,
> > > > because
> > > > > > we are
> > > > > > > > going to return at least one message, so the per partition
> > fetch
> > > > size
> > > > > > seems
> > > > > > > > already a soft limit. Since we are introducing a new fetch
> > > > protocol and
> > > > > > > > this is related, it might be worth considering this option.
> > > > > > > >
> > > > > > > > BTW, one reason I bring this up again was because yesterday we
> > had
> > > > a
> > > > > > > > presentation from Uber regarding the end to end latency. And
> > they
> > > > are
> > > > > > > > seeing this binary search behavior impacting the latency due to
> > > > page
> > > > > > in/out
> > > > > > > > of the index file.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Dec 5, 2017 at 5:55 PM, Jun Rao <jun@confluent.io>
> > wrote:
> > > > > > > >
> > > > > > > > > Hi, Jiangjie,
> > > > > > > > >
> > > > > > > > > Not sure returning the fetch response at the index boundary
> > is a
> > > > > > general
> > > > > > > > > solution. The index interval is configurable. If one
> > configures
> > > > the
> > > > > > index
> > > > > > > > > interval larger than the per partition fetch size, we
> > probably
> > > > have
> > > > > > to
> > > > > > > > > return data not at the index boundary.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Tue, Dec 5, 2017 at 4:17 PM, Becket Qin <
> > becket.qin@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Colin,
> > > > > > > > > >
> > > > > > > > > > Thinking about this again. I do see the reason that we
> > want to
> > > > > > have a
> > > > > > > > > epoch
> > > > > > > > > > to avoid out of order registration of the interested set.
> > But
> > > > I am
> > > > > > > > > > wondering if the following semantic would meet what we want
> > > > better:
> > > > > > > > > >  - Session Id: the id assigned to a single client for life
> > long
> > > > > > time.
> > > > > > > > i.e
> > > > > > > > > > it does not change when the interested partitions change.
> > > > > > > > > >  - Epoch: the interested set epoch. Only updated when a
> > full
> > > > fetch
> > > > > > > > > request
> > > > > > > > > > comes, which may result in the interested partition set
> > change.
> > > > > > > > > > This will ensure that the registered interested set will
> > > > always be
> > > > > > the
> > > > > > > > > > latest registration. And the clients can change the
> > interested
> > > > > > > > partition
> > > > > > > > > > set without creating another session.
> > > > > > > > > >
> > > > > > > > > > Also I want to bring up the way the leader respond to the
> > > > > > FetchRequest
> > > > > > > > > > again. I think it would be a big improvement if we just
> > return
> > > > the
> > > > > > > > > > responses at index entry boundary or log end. There are a
> > few
> > > > > > benefits:
> > > > > > > > > > 1. The leader does not need the follower to provide the
> > > > offsets,
> > > > > > > > > > 2. The fetch requests no longer need to do a binary search
> > on
> > > > the
> > > > > > > > index,
> > > > > > > > > it
> > > > > > > > > > just need to do a linear access to the index file, which is
> > > > much
> > > > > > cache
> > > > > > > > > > friendly.
> > > > > > > > > >
> > > > > > > > > > Assuming the leader can get the last returned offsets to
> > the
> > > > > > clients
> > > > > > > > > > cheaply, I am still not sure why it is necessary for the
> > > > followers
> > > > > > to
> > > > > > > > > > repeat the offsets in the incremental fetch every time.
> > > > > > Intuitively it
> > > > > > > > > > should only update the offsets when the leader has wrong
> > > > offsets,
> > > > > > in
> > > > > > > > most
> > > > > > > > > > cases, the incremental fetch request should just be empty.
> > > > > > Otherwise we
> > > > > > > > > may
> > > > > > > > > > not be saving much when there are continuous small requests
> > > > going
> > > > > > to
> > > > > > > > each
> > > > > > > > > > partition, which could be normal for some low latency
> > systems.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe <
> > > > cmccabe@apache.org>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > > > > > > > > > Hi Colin
> > > > > > > > > > > >
> > > > > > > > > > > > Addressing the topic of how to manage slots from the
> > other
> > > > > > thread.
> > > > > > > > > > > > With tcp connections all this comes for free
> > essentially.
> > > > > > > > > > >
> > > > > > > > > > > Hi Jan,
> > > > > > > > > > >
> > > > > > > > > > > I don't think that it's accurate to say that cache
> > management
> > > > > > "comes
> > > > > > > > > for
> > > > > > > > > > > free" by coupling the incremental fetch session with the
> > TCP
> > > > > > session.
> > > > > > > > > > > When a new TCP session is started by a fetch request, you
> > > > still
> > > > > > have
> > > > > > > > to
> > > > > > > > > > > decide whether to grant that request an incremental fetch
> > > > > > session or
> > > > > > > > > > > not.  If your answer is that you always grant the
> > request, I
> > > > > > would
> > > > > > > > > argue
> > > > > > > > > > > that you do not have cache management.
> > > > > > > > > > >
> > > > > > > > > > > I guess you could argue that timeouts are cache
> > management,
> > > > but I
> > > > > > > > don't
> > > > > > > > > > > find that argument persuasive.  Anyone could just create
> > a
> > > > lot
> > > > > > of TCP
> > > > > > > > > > > sessions and use a lot of resources, in that case.  So
> > there
> > > > is
> > > > > > > > > > > essentially no limit on memory use.  In any case, TCP
> > > > sessions
> > > > > > don't
> > > > > > > > > > > help us implement fetch session timeouts.
> > > > > > > > > > >
> > > > > > > > > > > > I still would argue we disable it by default and make a
> > > > flag
> > > > > > in the
> > > > > > > > > > > > broker to ask the leader to maintain the cache while
> > > > > > replicating
> > > > > > > > and
> > > > > > > > > > > also only
> > > > > > > > > > > > have it optional in consumers (default to off) so one
> > can
> > > > turn
> > > > > > it
> > > > > > > > on
> > > > > > > > > > > > where it really hurts.  MirrorMaker and audit consumers
> > > > > > > > prominently.
> > > > > > > > > > >
> > > > > > > > > > > I agree with Jason's point from earlier in the thread.
> > > > Adding
> > > > > > extra
> > > > > > > > > > > configuration knobs that aren't really necessary can harm
> > > > > > usability.
> > > > > > > > > > > Certainly asking people to manually turn on a feature
> > "where
> > > > it
> > > > > > > > really
> > > > > > > > > > > hurts" seems to fall in that category, when we could
> > easily
> > > > > > enable it
> > > > > > > > > > > automatically for them.
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Otherwise I left a few remarks in-line, which should
> > help
> > > > to
> > > > > > > > > understand
> > > > > > > > > > > > my view of the situation better
> > > > > > > > > > > >
> > > > > > > > > > > > Best Jan
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On 05.12.2017 08:06, Colin McCabe wrote:
> > > > > > > > > > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > > > > > > > > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > > > > > > > > > > >>>> Thanks for the explanation, Colin. A few more
> > > > questions.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>> The session epoch is not complex.  It's just a
> > number
> > > > > > which
> > > > > > > > > > > increments
> > > > > > > > > > > > >>>>> on each incremental fetch.  The session epoch is
> > also
> > > > > > useful
> > > > > > > > > for
> > > > > > > > > > > > >>>>> debugging-- it allows you to match up requests
> > and
> > > > > > responses
> > > > > > > > > when
> > > > > > > > > > > > >>>>> looking at log files.
> > > > > > > > > > > > >>>> Currently each request in Kafka has a correlation
> > id
> > > > to
> > > > > > help
> > > > > > > > > match
> > > > > > > > > > > the
> > > > > > > > > > > > >>>> requests and responses. Is epoch doing something
> > > > > > differently?
> > > > > > > > > > > > >>> Hi Becket,
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> The correlation ID is used within a single TCP
> > > > session, to
> > > > > > > > > uniquely
> > > > > > > > > > > > >>> associate a request with a response.  The
> > correlation
> > > > ID
> > > > > > is not
> > > > > > > > > > > unique
> > > > > > > > > > > > >>> (and has no meaning) outside the context of that
> > > > single TCP
> > > > > > > > > > session.
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> Keep in mind, NetworkClient is in charge of TCP
> > > > sessions,
> > > > > > and
> > > > > > > > > > > generally
> > > > > > > > > > > > >>> tries to hide that information from the upper
> > layers
> > > > of the
> > > > > > > > code.
> > > > > > > > > > So
> > > > > > > > > > > > >>> when you submit a request to NetworkClient, you
> > don't
> > > > know
> > > > > > if
> > > > > > > > > that
> > > > > > > > > > > > >>> request creates a TCP session, or reuses an
> > existing
> > > > one.
> > > > > > > > > > > > >>>>> Unfortunately, this doesn't work.  Imagine the
> > client
> > > > > > misses
> > > > > > > > an
> > > > > > > > > > > > >>>>> increment fetch response about a partition.  And
> > > > then the
> > > > > > > > > > > partition is
> > > > > > > > > > > > >>>>> never updated after that.  The client has no way
> > to
> > > > know
> > > > > > > > about
> > > > > > > > > > the
> > > > > > > > > > > > >>>>> partition, since it won't be included in any
> > future
> > > > > > > > incremental
> > > > > > > > > > > fetch
> > > > > > > > > > > > >>>>> responses.  And there are no offsets to compare,
> > > > since
> > > > > > the
> > > > > > > > > > > partition is
> > > > > > > > > > > > >>>>> simply omitted from the response.
> > > > > > > > > > > > >>>> I am curious about in which situation would the
> > > > follower
> > > > > > miss
> > > > > > > > a
> > > > > > > > > > > response
> > > > > > > > > > > > >>>> of a partition. If the entire FetchResponse is
> > lost
> > > > (e.g.
> > > > > > > > > > timeout),
> > > > > > > > > > > the
> > > > > > > > > > > > >>>> follower would disconnect and retry. That will
> > result
> > > > in
> > > > > > > > > sending a
> > > > > > > > > > > full
> > > > > > > > > > > > >>>> FetchRequest.
> > > > > > > > > > > > >>> Basically, you are proposing that we rely on TCP
> > for
> > > > > > reliable
> > > > > > > > > > > delivery
> > > > > > > > > > > > >>> in a distributed system.  That isn't a good idea
> > for a
> > > > > > bunch of
> > > > > > > > > > > > >>> different reasons.  First of all, TCP timeouts
> > tend to
> > > > be
> > > > > > very
> > > > > > > > > > > long.  So
> > > > > > > > > > > > >>> if the TCP session timing out is your error
> > detection
> > > > > > > > mechanism,
> > > > > > > > > > you
> > > > > > > > > > > > >>> have to wait minutes for messages to timeout.  Of
> > > > course,
> > > > > > we
> > > > > > > > add
> > > > > > > > > a
> > > > > > > > > > > > >>> timeout on top of that after which we declare the
> > > > > > connection
> > > > > > > > bad
> > > > > > > > > > and
> > > > > > > > > > > > >>> manually close it.  But just because the session is
> > > > closed
> > > > > > on
> > > > > > > > one
> > > > > > > > > > end
> > > > > > > > > > > > >>> doesn't mean that the other end knows that it is
> > > > closed.
> > > > > > So
> > > > > > > > the
> > > > > > > > > > > leader
> > > > > > > > > > > > >>> may have to wait quite a long time before TCP
> > decides
> > > > that
> > > > > > yes,
> > > > > > > > > > > > >>> connection X from the follower is dead and not
> > coming
> > > > back,
> > > > > > > > even
> > > > > > > > > > > though
> > > > > > > > > > > > >>> gremlins ate the FIN packet which the follower
> > > > attempted to
> > > > > > > > > > > translate.
> > > > > > > > > > > > >>> If the cache state is tied to that TCP session, we
> > > > have to
> > > > > > keep
> > > > > > > > > > that
> > > > > > > > > > > > >>> cache around for a much longer time than we should.
> > > > > > > > > > > > >> Hi,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> I see this from a different perspective. The cache
> > > > expiry
> > > > > > time
> > > > > > > > > > > > >> has the same semantic as idle connection time in
> > this
> > > > > > scenario.
> > > > > > > > > > > > >> It is the time range we expect the client to come
> > back
> > > > an
> > > > > > reuse
> > > > > > > > > > > > >> its broker side state. I would argue that on close
> > we
> > > > would
> > > > > > get
> > > > > > > > an
> > > > > > > > > > > > >> extra shot at cleaning up the session state early.
> > As
> > > > > > opposed to
> > > > > > > > > > > > >> always wait for that duration for expiry to happen.
> > > > > > > > > > > > > Hi Jan,
> > > > > > > > > > > > >
> > > > > > > > > > > > > The idea here is that the incremental fetch cache
> > expiry
> > > > > > time can
> > > > > > > > > be
> > > > > > > > > > > > > much shorter than the TCP session timeout.  In
> > general
> > > > the
> > > > > > TCP
> > > > > > > > > > session
> > > > > > > > > > > > > timeout is common to all TCP connections, and very
> > > > long.  To
> > > > > > make
> > > > > > > > > > these
> > > > > > > > > > > > > numbers a little more concrete, the TCP session
> > timeout
> > > > is
> > > > > > often
> > > > > > > > > > > > > configured to be 2 hours on Linux.  (See
> > > > > > > > > > > > > https://www.cyberciti.biz/tips/linux-increasing-or-
> > > > > > > > > > > decreasing-tcp-sockets-timeouts.html
> > > > > > > > > > > > > )  The timeout I was proposing for incremental fetch
> > > > > > sessions was
> > > > > > > > > one
> > > > > > > > > > > or
> > > > > > > > > > > > > two minutes at most.
> > > > > > > > > > > > Currently this is taken care of by
> > > > > > > > > > > > connections.max.idle.ms on the broker and defaults to
> > > > > > something of
> > > > > > > > > few
> > > > > > > > > > > > minutes.
> > > > > > > > > > >
> > > > > > > > > > > It is 10 minutes by default, which is longer than what we
> > > > want
> > > > > > the
> > > > > > > > > > > incremental fetch session timeout to be.  There's no
> > reason
> > > > to
> > > > > > couple
> > > > > > > > > > > these two things.
> > > > > > > > > > >
> > > > > > > > > > > > Also something we could let the client change if we
> > really
> > > > > > wanted
> > > > > > > > to.
> > > > > > > > > > > > So there is no need to worry about coupling our
> > > > implementation
> > > > > > to
> > > > > > > > > some
> > > > > > > > > > > > timeouts given by the OS, with TCP one always has full
> > > > control
> > > > > > over
> > > > > > > > > the
> > > > > > > > > > > worst
> > > > > > > > > > > > times + one gets the extra shot cleaning up early when
> > the
> > > > > > close
> > > > > > > > > comes
> > > > > > > > > > > through.
> > > > > > > > > > > > Which is the majority of the cases.
> > > > > > > > > > >
> > > > > > > > > > > In the majority of cases, the TCP session will be
> > > > > > re-established.  In
> > > > > > > > > > > that case, we have to send a full fetch request rather
> > than
> > > > an
> > > > > > > > > > > incremental fetch request.
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >>> Secondly, from a software engineering perspective,
> > it's
> > > > > > not a
> > > > > > > > > good
> > > > > > > > > > > idea
> > > > > > > > > > > > >>> to try to tightly tie together TCP and our code.
> > We
> > > > would
> > > > > > have
> > > > > > > > > to
> > > > > > > > > > > > >>> rework how we interact with NetworkClient so that
> > we
> > > > are
> > > > > > aware
> > > > > > > > of
> > > > > > > > > > > things
> > > > > > > > > > > > >>> like TCP sessions closing or opening.  We would
> > have
> > > > to be
> > > > > > > > > careful
> > > > > > > > > > > > >>> preserve the ordering of incoming messages when
> > doing
> > > > > > things
> > > > > > > > like
> > > > > > > > > > > > >>> putting incoming requests on to a queue to be
> > > > processed by
> > > > > > > > > multiple
> > > > > > > > > > > > >>> threads.  It's just a lot of complexity to add, and
> > > > > > there's no
> > > > > > > > > > > upside.
> > > > > > > > > > > > >> I see the point here. And I had a small chat with
> > Dong
> > > > Lin
> > > > > > > > already
> > > > > > > > > > > > >> making me aware of this. I tried out the approaches
> > and
> > > > > > propose
> > > > > > > > > the
> > > > > > > > > > > > >> following:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> The client start and does a full fetch. It then does
> > > > > > incremental
> > > > > > > > > > > fetches.
> > > > > > > > > > > > >> The connection to the broker dies and is
> > re-established
> > > > by
> > > > > > > > > > > NetworkClient
> > > > > > > > > > > > >> under the hood.
> > > > > > > > > > > > >> The broker sees an incremental fetch without having
> > > > state =>
> > > > > > > > > returns
> > > > > > > > > > > > >> error:
> > > > > > > > > > > > >> Client sees the error, does a full fetch and goes
> > back
> > > > to
> > > > > > > > > > > incrementally
> > > > > > > > > > > > >> fetching.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> having this 1 additional error round trip is
> > > > essentially the
> > > > > > > > same
> > > > > > > > > as
> > > > > > > > > > > > >> when something
> > > > > > > > > > > > >> with the sessions or epoch changed unexpectedly to
> > the
> > > > > > client
> > > > > > > > (say
> > > > > > > > > > > > >> expiry).
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> So its nothing extra added but the conditions are
> > > > easier to
> > > > > > > > > > evaluate.
> > > > > > > > > > > > >> Especially since we do everything with
> > NetworkClient.
> > > > Other
> > > > > > > > > > > implementers
> > > > > > > > > > > > >> on the
> > > > > > > > > > > > >> protocol are free to optimizes this and do not do
> > the
> > > > > > errornours
> > > > > > > > > > > > >> roundtrip on the
> > > > > > > > > > > > >> new connection.
> > > > > > > > > > > > >> Its a great plus that the client can know when the
> > > > error is
> > > > > > > > gonna
> > > > > > > > > > > > >> happen. instead of
> > > > > > > > > > > > >> the server to always have to report back if
> > something
> > > > > > changes
> > > > > > > > > > > > >> unexpectedly for the client
> > > > > > > > > > > > > You are assuming that the leader and the follower
> > agree
> > > > that
> > > > > > the
> > > > > > > > > TCP
> > > > > > > > > > > > > session drops at the same time.  When there are
> > network
> > > > > > problems,
> > > > > > > > > > this
> > > > > > > > > > > > > may not be true.  The leader may still think the
> > > > previous TCP
> > > > > > > > > session
> > > > > > > > > > > is
> > > > > > > > > > > > > active.  In that case, we have to keep the
> > incremental
> > > > fetch
> > > > > > > > > session
> > > > > > > > > > > > > state around until we learn otherwise (which could
> > be up
> > > > to
> > > > > > that
> > > > > > > > 2
> > > > > > > > > > hour
> > > > > > > > > > > > > timeout I mentioned).  And if we get a new incoming
> > > > > > incremental
> > > > > > > > > fetch
> > > > > > > > > > > > > request, we can't assume that it replaces the
> > previous
> > > > one,
> > > > > > > > because
> > > > > > > > > > the
> > > > > > > > > > > > > IDs will be different (the new one starts a new
> > session).
> > > > > > > > > > > > As mentioned, no reason to fear some time-outs out of
> > our
> > > > > > control
> > > > > > > > > > > > >
> > > > > > > > > > > > >>> Imagine that I made an argument that client IDs are
> > > > > > "complex"
> > > > > > > > and
> > > > > > > > > > > should
> > > > > > > > > > > > >>> be removed from our APIs.  After all, we can just
> > look
> > > > at
> > > > > > the
> > > > > > > > > > remote
> > > > > > > > > > > IP
> > > > > > > > > > > > >>> address and TCP port of each connection.  Would you
> > > > think
> > > > > > that
> > > > > > > > > was
> > > > > > > > > > a
> > > > > > > > > > > > >>> good idea?  The client ID is useful when looking at
> > > > logs.
> > > > > > For
> > > > > > > > > > > example,
> > > > > > > > > > > > >>> if a rebalance is having problems, you want to know
> > > > what
> > > > > > > > clients
> > > > > > > > > > were
> > > > > > > > > > > > >>> having a problem.  So having the client ID field to
> > > > guide
> > > > > > you
> > > > > > > > is
> > > > > > > > > > > > >>> actually much less "complex" in practice than not
> > > > having
> > > > > > an ID.
> > > > > > > > > > > > >> I still cant follow why the correlation idea will
> > not
> > > > help
> > > > > > here.
> > > > > > > > > > > > >> Correlating logs with it usually works great. Even
> > with
> > > > > > > > primitive
> > > > > > > > > > > tools
> > > > > > > > > > > > >> like grep
> > > > > > > > > > > > > The correlation ID does help somewhat, but certainly
> > not
> > > > as
> > > > > > much
> > > > > > > > > as a
> > > > > > > > > > > > > unique 64-bit ID.  The correlation ID is not unique
> > in
> > > > the
> > > > > > > > broker,
> > > > > > > > > > just
> > > > > > > > > > > > > unique to a single NetworkClient.  Simiarly, the
> > > > correlation
> > > > > > ID
> > > > > > > > is
> > > > > > > > > > not
> > > > > > > > > > > > > unique on the client side, if there are multiple
> > > > Consumers,
> > > > > > etc.
> > > > > > > > > > > > Can always bump entropy in correlation IDs, never had a
> > > > problem
> > > > > > > > > > > > of finding to many duplicates. Would be a different KIP
> > > > though.
> > > > > > > > > > > > >
> > > > > > > > > > > > >>> Similarly, if metadata responses had epoch numbers
> > > > (simple
> > > > > > > > > > > incrementing
> > > > > > > > > > > > >>> numbers), we would not have to debug problems like
> > > > clients
> > > > > > > > > > > accidentally
> > > > > > > > > > > > >>> getting old metadata from servers that had been
> > > > > > partitioned off
> > > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > >>> network for a while.  Clients would know the
> > difference
> > > > > > between
> > > > > > > > > old
> > > > > > > > > > > and
> > > > > > > > > > > > >>> new metadata.  So putting epochs in to the metadata
> > > > > > request is
> > > > > > > > > much
> > > > > > > > > > > less
> > > > > > > > > > > > >>> "complex" operationally, even though it's an extra
> > > > field
> > > > > > in the
> > > > > > > > > > > request.
> > > > > > > > > > > > >>>    This has been discussed before on the mailing
> > list.
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> So I think the bottom line for me is that having
> > the
> > > > > > session ID
> > > > > > > > > and
> > > > > > > > > > > > >>> session epoch, while it adds two extra fields,
> > reduces
> > > > > > > > > operational
> > > > > > > > > > > > >>> complexity and increases debuggability.  It avoids
> > > > tightly
> > > > > > > > > coupling
> > > > > > > > > > > us
> > > > > > > > > > > > >>> to assumptions about reliable ordered delivery
> > which
> > > > tend
> > > > > > to be
> > > > > > > > > > > violated
> > > > > > > > > > > > >>> in practice in multiple layers of the stack.
> > Finally,
> > > > it
> > > > > > > > avoids
> > > > > > > > > > the
> > > > > > > > > > > > >>> necessity of refactoring NetworkClient.
> > > > > > > > > > > > >> So there is stacks out there that violate TCP
> > > > guarantees?
> > > > > > And
> > > > > > > > > > software
> > > > > > > > > > > > >> still works? How can this be? Can you elaborate a
> > little
> > > > > > where
> > > > > > > > > this
> > > > > > > > > > > > >> can be violated? I am not very familiar with
> > virtualized
> > > > > > > > > > environments
> > > > > > > > > > > > >> but they can't really violate TCP contracts.
> > > > > > > > > > > > > TCP's guarantees of reliable, in-order transmission
> > > > > > certainly can
> > > > > > > > > be
> > > > > > > > > > > > > violated.  For example, I once had to debug a cluster
> > > > where a
> > > > > > > > > certain
> > > > > > > > > > > > > node had a network card which corrupted its
> > transmissions
> > > > > > > > > > occasionally.
> > > > > > > > > > > > > With all the layers of checksums, you would think
> > that
> > > > this
> > > > > > was
> > > > > > > > not
> > > > > > > > > > > > > possible, but it happened.  We occasionally got
> > corrupted
> > > > > > data
> > > > > > > > > > written
> > > > > > > > > > > > > to disk on the other end because of it.  Even more
> > > > > > frustrating,
> > > > > > > > the
> > > > > > > > > > > data
> > > > > > > > > > > > > was not corrupted on disk on the sending node-- it
> > was a
> > > > bug
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > > network card driver that was injecting the errors.
> > > > > > > > > > > > true, but your broker might aswell read a corrupted
> > 600GB
> > > > as
> > > > > > size
> > > > > > > > > from
> > > > > > > > > > > > the network and die with OOM instantly.
> > > > > > > > > > >
> > > > > > > > > > > If you read 600 GB as the size from the network, you
> > will not
> > > > > > "die
> > > > > > > > with
> > > > > > > > > > > OOM instantly."  That would be a bug.  Instead, you will
> > > > notice
> > > > > > that
> > > > > > > > > 600
> > > > > > > > > > > GB is greater than max.message.bytes, and close the
> > > > connection.
> > > > > > > > > > >
> > > > > > > > > > > > Optimizing for still having functional
> > > > > > > > > > > > software under this circumstances is not reasonable.
> > > > > > > > > > > > You want to get rid of such a
> > > > > > > > > > > > node ASAP and pray that zookeepers ticks get corrupted
> > > > often
> > > > > > enough
> > > > > > > > > > > > that it finally drops out of the cluster.
> > > > > > > > > > > >
> > > > > > > > > > > > There is a good reason that these kinda things
> > > > > > > > > > > > https://issues.apache.org/jira/browse/MESOS-4105
> > > > > > > > > > > > don't end up as kafka Jiras. In the end you can't run
> > any
> > > > > > software
> > > > > > > > in
> > > > > > > > > > > > these containers anymore. Application layer checksums
> > are a
> > > > > > neat
> > > > > > > > > thing
> > > > > > > > > > to
> > > > > > > > > > > > fail fast but trying to cope with this probably causes
> > > > more bad
> > > > > > > > than
> > > > > > > > > > > > good.  So I would argue that we shouldn't try this for
> > the
> > > > > > fetch
> > > > > > > > > > > requests.
> > > > > > > > > > >
> > > > > > > > > > > One of the goals of Apache Kafka is to be "a streaming
> > > > > > platform...
> > > > > > > > > > > [that] lets you store streams of records in a
> > fault-tolerant
> > > > > > way."
> > > > > > > > For
> > > > > > > > > > > more information, see https://kafka.apache.org/intro .
> > > > > > > > > Fault-tolerance
> > > > > > > > > > > is explicitly part of the goal of Kafka.  Prayer should
> > be
> > > > > > optional,
> > > > > > > > > not
> > > > > > > > > > > required, when running the software.
> > > > > > > > > > >
> > > > > > > > > > > Crashing because someone sent you a bad packet is not
> > > > reasonable
> > > > > > > > > > > behavior.  It is a bug.  Similarly, bringing down the
> > whole
> > > > > > cluster,
> > > > > > > > > > > which could a hundred nodes, because someone had a bad
> > > > network
> > > > > > > > adapter
> > > > > > > > > > > is not reasonable behavior.  It is perhaps reasonable
> > for the
> > > > > > cluster
> > > > > > > > > to
> > > > > > > > > > > perform worse when hardware is having problems.  But
> > that's a
> > > > > > > > different
> > > > > > > > > > > discussion.
> > > > > > > > > > >
> > > > > > > > > > > best,
> > > > > > > > > > > Colin
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > However, my point was not about TCP's guarantees
> > being
> > > > > > violated.
> > > > > > > > > My
> > > > > > > > > > > > > point is that TCP's guarantees are only one small
> > > > building
> > > > > > block
> > > > > > > > to
> > > > > > > > > > > > > build a robust distributed system.  TCP basically
> > just
> > > > says
> > > > > > that
> > > > > > > > if
> > > > > > > > > > you
> > > > > > > > > > > > > get any bytes from the stream, you will get the ones
> > that
> > > > > > were
> > > > > > > > sent
> > > > > > > > > > by
> > > > > > > > > > > > > the sender, in the order they were sent.  TCP does
> > not
> > > > > > guarantee
> > > > > > > > > that
> > > > > > > > > > > > > the bytes you send will get there.  It does not
> > guarantee
> > > > > > that if
> > > > > > > > > you
> > > > > > > > > > > > > close the connection, the other end will know about
> > it
> > > > in a
> > > > > > > > timely
> > > > > > > > > > > > > fashion.
> > > > > > > > > > > > These are very powerful grantees and since we use TCP
> > we
> > > > should
> > > > > > > > > > > > piggy pack everything that is reasonable on to it. IMO
> > > > there
> > > > > > is no
> > > > > > > > > > > > need to reimplement correct sequencing again if you get
> > > > that
> > > > > > from
> > > > > > > > > > > > your transport layer. It saves you the complexity, it
> > makes
> > > > > > > > > > > > you application behave way more naturally and your api
> > > > easier
> > > > > > to
> > > > > > > > > > > > understand.
> > > > > > > > > > > >
> > > > > > > > > > > > There is literally nothing the Kernel wont let you
> > decide
> > > > > > > > > > > > especially not any timings. Only noticeable exception
> > being
> > > > > > > > TIME_WAIT
> > > > > > > > > > > > of usually 240 seconds but that already has little todo
> > > > with
> > > > > > the
> > > > > > > > > broker
> > > > > > > > > > > > itself and
> > > > > > > > > > > > if we are running out of usable ports because of this
> > then
> > > > > > expiring
> > > > > > > > > > > > fetch requests
> > > > > > > > > > > > wont help much anyways.
> > > > > > > > > > > >
> > > > > > > > > > > > I hope I could strengthen the trust you have in
> > userland
> > > > TCP
> > > > > > > > > connection
> > > > > > > > > > > > management. It is really powerful and can be exploited
> > for
> > > > > > maximum
> > > > > > > > > > gains
> > > > > > > > > > > > without much risk in my opinion.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > It does not guarantee that the bytes will be
> > received in
> > > > a
> > > > > > > > > > > > > certain timeframe, and certainly doesn't guarantee
> > that
> > > > if
> > > > > > you
> > > > > > > > > send a
> > > > > > > > > > > > > byte on connection X and then on connection Y, that
> > the
> > > > > > remote
> > > > > > > > end
> > > > > > > > > > will
> > > > > > > > > > > > > read a byte on X before reading a byte on Y.
> > > > > > > > > > > > Noone expects this from two independent paths of any
> > kind.
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > best,
> > > > > > > > > > > > > Colin
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hope this made my view clearer, especially the first
> > > > part.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Best Jan
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>> best,
> > > > > > > > > > > > >>> Colin
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>>> If there is an error such as
> > NotLeaderForPartition is
> > > > > > > > > > > > >>>> returned for some partitions, the follower can
> > always
> > > > > > send a
> > > > > > > > > full
> > > > > > > > > > > > >>>> FetchRequest. Is there a scenario that only some
> > of
> > > > the
> > > > > > > > > partitions
> > > > > > > > > > > in a
> > > > > > > > > > > > >>>> FetchResponse is lost?
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> Thanks,
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> Jiangjie (Becket) Qin
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> > > > > > > > cmccabe@apache.org
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > > > > > > > > > > > >>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<
> > > > > > > > > > cmccabe@apache.org>
> > > > > > > > > > > > >>>>> wrote:
> > > > > > > > > > > > >>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > > > > > > > > > > >>>>>>>> Hey Colin,
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>> Thanks much for the update. I have a few
> > questions
> > > > > > below:
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>> 1. I am not very sure that we need Fetch
> > Session
> > > > > > Epoch. It
> > > > > > > > > > > seems that
> > > > > > > > > > > > >>>>>>>> Fetch
> > > > > > > > > > > > >>>>>>>> Session Epoch is only needed to help leader
> > > > > > distinguish
> > > > > > > > > > between
> > > > > > > > > > > "a
> > > > > > > > > > > > >>>>> full
> > > > > > > > > > > > >>>>>>>> fetch request" and "a full fetch request and
> > > > request
> > > > > > a new
> > > > > > > > > > > > >>>>> incremental
> > > > > > > > > > > > >>>>>>>> fetch session". Alternatively, follower can
> > also
> > > > > > indicate
> > > > > > > > "a
> > > > > > > > > > > full
> > > > > > > > > > > > >>>>> fetch
> > > > > > > > > > > > >>>>>>>> request and request a new incremental fetch
> > > > session"
> > > > > > by
> > > > > > > > > > setting
> > > > > > > > > > > Fetch
> > > > > > > > > > > > >>>>>>>> Session ID to -1 without using Fetch Session
> > > > Epoch.
> > > > > > Does
> > > > > > > > > this
> > > > > > > > > > > make
> > > > > > > > > > > > >>>>> sense?
> > > > > > > > > > > > >>>>>>> Hi Dong,
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> The fetch session epoch is very important for
> > > > ensuring
> > > > > > > > > > > correctness.  It
> > > > > > > > > > > > >>>>>>> prevents corrupted or incomplete fetch data
> > due to
> > > > > > network
> > > > > > > > > > > reordering
> > > > > > > > > > > > >>>>> or
> > > > > > > > > > > > >>>>>>> loss.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> For example, consider a scenario where the
> > follower
> > > > > > sends a
> > > > > > > > > > fetch
> > > > > > > > > > > > >>>>>>> request to the leader.  The leader responds,
> > but
> > > > the
> > > > > > > > response
> > > > > > > > > > is
> > > > > > > > > > > lost
> > > > > > > > > > > > >>>>>>> because of network problems which affected the
> > TCP
> > > > > > session.
> > > > > > > > > In
> > > > > > > > > > > that
> > > > > > > > > > > > >>>>>>> case, the follower must establish a new TCP
> > > > session and
> > > > > > > > > re-send
> > > > > > > > > > > the
> > > > > > > > > > > > >>>>>>> incremental fetch request.  But the leader
> > does not
> > > > > > know
> > > > > > > > that
> > > > > > > > > > the
> > > > > > > > > > > > >>>>>>> follower didn't receive the previous
> > incremental
> > > > fetch
> > > > > > > > > > > response.  It is
> > > > > > > > > > > > >>>>>>> only the incremental fetch epoch which lets the
> > > > leader
> > > > > > know
> > > > > > > > > > that
> > > > > > > > > > > it
> > > > > > > > > > > > >>>>>>> needs to resend that data, and not data which
> > comes
> > > > > > > > > afterwards.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> You could construct similar scenarios with
> > message
> > > > > > > > > reordering,
> > > > > > > > > > > > >>>>>>> duplication, etc.  Basically, this is a
> > stateful
> > > > > > protocol
> > > > > > > > on
> > > > > > > > > an
> > > > > > > > > > > > >>>>>>> unreliable network, and you need to know
> > whether
> > > > the
> > > > > > > > follower
> > > > > > > > > > > got the
> > > > > > > > > > > > >>>>>>> previous data you sent before you move on.
> > And you
> > > > > > need to
> > > > > > > > > > > handle
> > > > > > > > > > > > >>>>>>> issues like duplicated or delayed requests.
> > These
> > > > > > issues
> > > > > > > > do
> > > > > > > > > > not
> > > > > > > > > > > affect
> > > > > > > > > > > > >>>>>>> the full fetch request, because it is not
> > > > stateful--
> > > > > > any
> > > > > > > > full
> > > > > > > > > > > fetch
> > > > > > > > > > > > >>>>>>> request can be understood and properly
> > responded
> > > > to in
> > > > > > > > > > isolation.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>> Thanks for the explanation. This makes sense.
> > On the
> > > > > > other
> > > > > > > > > hand
> > > > > > > > > > I
> > > > > > > > > > > would
> > > > > > > > > > > > >>>>>> be interested in learning more about whether
> > > > Becket's
> > > > > > > > solution
> > > > > > > > > > > can help
> > > > > > > > > > > > >>>>>> simplify the protocol by not having the echo
> > field
> > > > and
> > > > > > > > whether
> > > > > > > > > > > that is
> > > > > > > > > > > > >>>>>> worth doing.
> > > > > > > > > > > > >>>>> Hi Dong,
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>> I commented about this in the other thread.  A
> > > > solution
> > > > > > which
> > > > > > > > > > > doesn't
> > > > > > > > > > > > >>>>> maintain session information doesn't work here.
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>>>>> 2. It is said that Incremental FetchRequest
> > will
> > > > > > include
> > > > > > > > > > > partitions
> > > > > > > > > > > > >>>>> whose
> > > > > > > > > > > > >>>>>>>> fetch offset or maximum number of fetch bytes
> > has
> > > > been
> > > > > > > > > > changed.
> > > > > > > > > > > If
> > > > > > > > > > > > >>>>>>>> follower's logStartOffet of a partition has
> > > > changed,
> > > > > > > > should
> > > > > > > > > > this
> > > > > > > > > > > > >>>>>>>> partition also be included in the next
> > > > FetchRequest
> > > > > > to the
> > > > > > > > > > > leader?
> > > > > > > > > > > > >>>>>>> Otherwise, it
> > > > > > > > > > > > >>>>>>>> may affect the handling of
> > DeleteRecordsRequest
> > > > > > because
> > > > > > > > > leader
> > > > > > > > > > > may
> > > > > > > > > > > > >>>>> not
> > > > > > > > > > > > >>>>>>> know
> > > > > > > > > > > > >>>>>>>> the corresponding data has been deleted on the
> > > > > > follower.
> > > > > > > > > > > > >>>>>>> Yeah, the follower should include the
> > partition if
> > > > the
> > > > > > > > > > > logStartOffset
> > > > > > > > > > > > >>>>>>> has changed.  That should be spelled out on the
> > > > KIP.
> > > > > > > > Fixed.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>>> 3. In the section "Per-Partition Data", a
> > > > partition
> > > > > > is not
> > > > > > > > > > > considered
> > > > > > > > > > > > >>>>>>>> dirty if its log start offset has changed.
> > Later
> > > > in
> > > > > > the
> > > > > > > > > > section
> > > > > > > > > > > > >>>>>>> "FetchRequest
> > > > > > > > > > > > >>>>>>>> Changes", it is said that incremental fetch
> > > > responses
> > > > > > will
> > > > > > > > > > > include a
> > > > > > > > > > > > >>>>>>>> partition if its logStartOffset has changed.
> > It
> > > > seems
> > > > > > > > > > > inconsistent.
> > > > > > > > > > > > >>>>> Can
> > > > > > > > > > > > >>>>>>>> you update the KIP to clarify it?
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>> In the "Per-Partition Data" section, it does
> > say
> > > > that
> > > > > > > > > > > logStartOffset
> > > > > > > > > > > > >>>>>>> changes make a partition dirty, though,
> > right?  The
> > > > > > first
> > > > > > > > > > bullet
> > > > > > > > > > > point
> > > > > > > > > > > > >>>>>>> is:
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>>> * The LogCleaner deletes messages, and this
> > > > changes
> > > > > > the
> > > > > > > > log
> > > > > > > > > > > start
> > > > > > > > > > > > >>>>> offset
> > > > > > > > > > > > >>>>>>> of the partition on the leader., or
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>> Ah I see. I think I didn't notice this because
> > > > statement
> > > > > > > > > assumes
> > > > > > > > > > > that the
> > > > > > > > > > > > >>>>>> LogStartOffset in the leader only changes due to
> > > > > > LogCleaner.
> > > > > > > > > In
> > > > > > > > > > > fact the
> > > > > > > > > > > > >>>>>> LogStartOffset can change on the leader due to
> > > > either
> > > > > > log
> > > > > > > > > > > retention and
> > > > > > > > > > > > >>>>>> DeleteRecordsRequest. I haven't verified whether
> > > > > > LogCleaner
> > > > > > > > > can
> > > > > > > > > > > change
> > > > > > > > > > > > >>>>>> LogStartOffset though. It may be a bit better to
> > > > just
> > > > > > say
> > > > > > > > > that a
> > > > > > > > > > > > >>>>>> partition is considered dirty if LogStartOffset
> > > > changes.
> > > > > > > > > > > > >>>>> I agree.  It should be straightforward to just
> > > > resend the
> > > > > > > > > > > partition if
> > > > > > > > > > > > >>>>> logStartOffset changes.
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>>>>> 4. In "Fetch Session Caching" section, it is
> > said
> > > > that
> > > > > > > > each
> > > > > > > > > > > broker
> > > > > > > > > > > > >>>>> has a
> > > > > > > > > > > > >>>>>>>> limited number of slots. How is this number
> > > > > > determined?
> > > > > > > > Does
> > > > > > > > > > > this
> > > > > > > > > > > > >>>>> require
> > > > > > > > > > > > >>>>>>>> a new broker config for this number?
> > > > > > > > > > > > >>>>>>> Good point.  I added two broker configuration
> > > > > > parameters to
> > > > > > > > > > > control
> > > > > > > > > > > > >>>>> this
> > > > > > > > > > > > >>>>>>> number.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>> I am curious to see whether we can avoid some of
> > > > these
> > > > > > new
> > > > > > > > > > > configs. For
> > > > > > > > > > > > >>>>>> example, incremental.fetch.session.
> > > > > > cache.slots.per.broker
> > > > > > > > is
> > > > > > > > > > > probably
> > > > > > > > > > > > >>>>> not
> > > > > > > > > > > > >>>>>> necessary because if a leader knows that a
> > > > FetchRequest
> > > > > > > > comes
> > > > > > > > > > > from a
> > > > > > > > > > > > >>>>>> follower, we probably want the leader to always
> > > > cache
> > > > > > the
> > > > > > > > > > > information
> > > > > > > > > > > > >>>>>> from that follower. Does this make sense?
> > > > > > > > > > > > >>>>> Yeah, maybe we can avoid having
> > > > > > > > > > > > >>>>> incremental.fetch.session.
> > cache.slots.per.broker.
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>>> Maybe we can discuss the config later after
> > there is
> > > > > > > > agreement
> > > > > > > > > > on
> > > > > > > > > > > how the
> > > > > > > > > > > > >>>>>> protocol would look like.
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>>> What is the error code if broker does
> > > > > > > > > > > > >>>>>>>> not have new log for the incoming
> > FetchRequest?
> > > > > > > > > > > > >>>>>>> Hmm, is there a typo in this question?  Maybe
> > you
> > > > > > meant to
> > > > > > > > > ask
> > > > > > > > > > > what
> > > > > > > > > > > > >>>>>>> happens if there is no new cache slot for the
> > > > incoming
> > > > > > > > > > > FetchRequest?
> > > > > > > > > > > > >>>>>>> That's not an error-- the incremental fetch
> > > > session ID
> > > > > > just
> > > > > > > > > > gets
> > > > > > > > > > > set to
> > > > > > > > > > > > >>>>>>> 0, indicating no incremental fetch session was
> > > > created.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>> Yeah there is a typo. You have answered my
> > question.
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>>> 5. Can you clarify what happens if follower
> > adds a
> > > > > > > > partition
> > > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > >>>>>>>> ReplicaFetcherThread after receiving
> > > > > > LeaderAndIsrRequest?
> > > > > > > > > Does
> > > > > > > > > > > leader
> > > > > > > > > > > > >>>>>>>> needs to generate a new session for this
> > > > > > > > > ReplicaFetcherThread
> > > > > > > > > > or
> > > > > > > > > > > > >>>>> does it
> > > > > > > > > > > > >>>>>>> re-use
> > > > > > > > > > > > >>>>>>>> the existing session?  If it uses a new
> > session,
> > > > is
> > > > > > the
> > > > > > > > old
> > > > > > > > > > > session
> > > > > > > > > > > > >>>>>>>> actively deleted from the slot?
> > > > > > > > > > > > >>>>>>> The basic idea is that you can't make changes,
> > > > except
> > > > > > by
> > > > > > > > > > sending
> > > > > > > > > > > a full
> > > > > > > > > > > > >>>>>>> fetch request.  However, perhaps we can allow
> > the
> > > > > > client to
> > > > > > > > > > > re-use its
> > > > > > > > > > > > >>>>>>> existing session ID.  If the client sets
> > sessionId
> > > > =
> > > > > > id,
> > > > > > > > > epoch
> > > > > > > > > > =
> > > > > > > > > > > 0, it
> > > > > > > > > > > > >>>>>>> could re-initialize the session.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>> Yeah I agree with the basic idea. We probably
> > want
> > > > to
> > > > > > > > > understand
> > > > > > > > > > > more
> > > > > > > > > > > > >>>>>> detail about how this works later.
> > > > > > > > > > > > >>>>> Sounds good.  I updated the KIP with this
> > > > information.  A
> > > > > > > > > > > > >>>>> re-initialization should be exactly the same as
> > an
> > > > > > > > > > initialization,
> > > > > > > > > > > > >>>>> except that it reuses an existing ID.
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>> best,
> > > > > > > > > > > > >>>>> Colin
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>>>>> BTW, I think it may be useful if the KIP can
> > > > include
> > > > > > the
> > > > > > > > > > example
> > > > > > > > > > > > >>>>> workflow
> > > > > > > > > > > > >>>>>>>> of how this feature will be used in case of
> > > > partition
> > > > > > > > change
> > > > > > > > > > > and so
> > > > > > > > > > > > >>>>> on.
> > > > > > > > > > > > >>>>>>> Yeah, that might help.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> best,
> > > > > > > > > > > > >>>>>>> Colin
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>>> Thanks,
> > > > > > > > > > > > >>>>>>>> Dong
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin
> > McCabe<
> > > > > > > > > > > cmccabe@apache.org>
> > > > > > > > > > > > >>>>>>>> wrote:
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>>> I updated the KIP with the ideas we've been
> > > > > > discussing.
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> best,
> > > > > > > > > > > > >>>>>>>>> Colin
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe
> > > > wrote:
> > > > > > > > > > > > >>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak
> > > > wrote:
> > > > > > > > > > > > >>>>>>>>>>> Hi Colin, thank you  for this KIP, it can
> > > > become a
> > > > > > > > really
> > > > > > > > > > > > >>>>> useful
> > > > > > > > > > > > >>>>>>> thing.
> > > > > > > > > > > > >>>>>>>>>>> I just scanned through the discussion so
> > far
> > > > and
> > > > > > wanted
> > > > > > > > > to
> > > > > > > > > > > > >>>>> start a
> > > > > > > > > > > > >>>>>>>>>>> thread to make as decision about keeping
> > the
> > > > > > > > > > > > >>>>>>>>>>> cache with the Connection / Session or
> > having
> > > > some
> > > > > > sort
> > > > > > > > > of
> > > > > > > > > > > UUID
> > > > > > > > > > > > >>>>>>> indN
> > > > > > > > > > > > >>>>>>>>> exed
> > > > > > > > > > > > >>>>>>>>>>> global Map.
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Sorry if that has been settled already and
> > I
> > > > > > missed it.
> > > > > > > > > In
> > > > > > > > > > > this
> > > > > > > > > > > > >>>>>>> case
> > > > > > > > > > > > >>>>>>>>>>> could anyone point me to the discussion?
> > > > > > > > > > > > >>>>>>>>>> Hi Jan,
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> I don't think anyone has discussed the idea
> > of
> > > > > > tying the
> > > > > > > > > > cache
> > > > > > > > > > > > >>>>> to an
> > > > > > > > > > > > >>>>>>>>>> individual TCP session yet.  I agree that
> > since
> > > > the
> > > > > > > > cache
> > > > > > > > > is
> > > > > > > > > > > > >>>>>>> intended to
> > > > > > > > > > > > >>>>>>>>>> be used only by a single follower or client,
> > > > it's an
> > > > > > > > > > > interesting
> > > > > > > > > > > > >>>>>>> thing
> > > > > > > > > > > > >>>>>>>>>> to think about.
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> I guess the obvious disadvantage is that
> > > > whenever
> > > > > > your
> > > > > > > > TCP
> > > > > > > > > > > > >>>>> session
> > > > > > > > > > > > >>>>>>>>>> drops, you have to make a full fetch request
> > > > rather
> > > > > > than
> > > > > > > > > an
> > > > > > > > > > > > >>>>>>> incremental
> > > > > > > > > > > > >>>>>>>>>> one.  It's not clear to me how often this
> > > > happens in
> > > > > > > > > > practice
> > > > > > > > > > > --
> > > > > > > > > > > > >>>>> it
> > > > > > > > > > > > >>>>>>>>>> probably depends a lot on the quality of the
> > > > > > network.
> > > > > > > > > From
> > > > > > > > > > a
> > > > > > > > > > > > >>>>> code
> > > > > > > > > > > > >>>>>>>>>> perspective, it might also be a bit
> > difficult to
> > > > > > access
> > > > > > > > > data
> > > > > > > > > > > > >>>>>>> associated
> > > > > > > > > > > > >>>>>>>>>> with the Session from classes like KafkaApis
> > > > > > (although
> > > > > > > > we
> > > > > > > > > > > could
> > > > > > > > > > > > >>>>>>> refactor
> > > > > > > > > > > > >>>>>>>>>> it to make this easier).
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> It's also clear that even if we tie the
> > cache
> > > > to the
> > > > > > > > > > session,
> > > > > > > > > > > we
> > > > > > > > > > > > >>>>>>> still
> > > > > > > > > > > > >>>>>>>>>> have to have limits on the number of caches
> > > > we're
> > > > > > > > willing
> > > > > > > > > to
> > > > > > > > > > > > >>>>> create.
> > > > > > > > > > > > >>>>>>>>>> And probably we should reserve some cache
> > slots
> > > > for
> > > > > > each
> > > > > > > > > > > > >>>>> follower, so
> > > > > > > > > > > > >>>>>>>>>> that clients don't take all of them.
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Id rather see a protocol in which the
> > client is
> > > > > > hinting
> > > > > > > > > the
> > > > > > > > > > > > >>>>> broker
> > > > > > > > > > > > >>>>>>>>> that,
> > > > > > > > > > > > >>>>>>>>>>> he is going to use the feature instead of a
> > > > client
> > > > > > > > > > > > >>>>>>>>>>> realizing that the broker just offered the
> > > > feature
> > > > > > > > > > > (regardless
> > > > > > > > > > > > >>>>> of
> > > > > > > > > > > > >>>>>>>>>>> protocol version which should only indicate
> > > > that
> > > > > > the
> > > > > > > > > > feature
> > > > > > > > > > > > >>>>>>>>>>> would be usable).
> > > > > > > > > > > > >>>>>>>>>> Hmm.  I'm not sure what you mean by
> > "hinting."
> > > > I do
> > > > > > > > think
> > > > > > > > > > > that
> > > > > > > > > > > > >>>>> the
> > > > > > > > > > > > >>>>>>>>>> server should have the option of not
> > accepting
> > > > > > > > incremental
> > > > > > > > > > > > >>>>> requests
> > > > > > > > > > > > >>>>>>> from
> > > > > > > > > > > > >>>>>>>>>> specific clients, in order to save memory
> > space.
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> This seems to work better with a per
> > > > > > > > > > > > >>>>>>>>>>> connection/session attached Metadata than
> > with
> > > > a
> > > > > > Map
> > > > > > > > and
> > > > > > > > > > > could
> > > > > > > > > > > > >>>>>>> allow
> > > > > > > > > > > > >>>>>>>>> for
> > > > > > > > > > > > >>>>>>>>>>> easier client implementations.
> > > > > > > > > > > > >>>>>>>>>>> It would also make Client-side code easier
> > as
> > > > there
> > > > > > > > > > wouldn't
> > > > > > > > > > > > >>>>> be any
> > > > > > > > > > > > >>>>>>>>>>> Cache-miss error Messages to handle.
> > > > > > > > > > > > >>>>>>>>>> It is nice not to have to handle cache-miss
> > > > > > responses, I
> > > > > > > > > > > agree.
> > > > > > > > > > > > >>>>>>>>>> However, TCP sessions aren't exposed to
> > most of
> > > > our
> > > > > > > > > > > client-side
> > > > > > > > > > > > >>>>> code.
> > > > > > > > > > > > >>>>>>>>>> For example, when the Producer creates a
> > > > message and
> > > > > > > > hands
> > > > > > > > > > it
> > > > > > > > > > > > >>>>> off to
> > > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > > >>>>>>>>>> NetworkClient, the NC will transparently
> > > > re-connect
> > > > > > and
> > > > > > > > > > > re-send a
> > > > > > > > > > > > >>>>>>>>>> message if the first send failed.  The
> > > > higher-level
> > > > > > code
> > > > > > > > > > will
> > > > > > > > > > > > >>>>> not be
> > > > > > > > > > > > >>>>>>>>>> informed about whether the TCP session was
> > > > > > > > re-established,
> > > > > > > > > > > > >>>>> whether an
> > > > > > > > > > > > >>>>>>>>>> existing TCP session was used, and so on.
> > So
> > > > > > overall I
> > > > > > > > > > would
> > > > > > > > > > > > >>>>> still
> > > > > > > > > > > > >>>>>>> lean
> > > > > > > > > > > > >>>>>>>>>> towards not coupling this to the TCP
> > session...
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> best,
> > > > > > > > > > > > >>>>>>>>>> Colin
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>     Thank you again for the KIP. And
> > again, if
> > > > > > this was
> > > > > > > > > > > clarified
> > > > > > > > > > > > >>>>>>> already
> > > > > > > > > > > > >>>>>>>>>>> please drop me a hint where I could read
> > about
> > > > it.
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Best Jan
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > > > > > > > > > > > >>>>>>>>>>>> Hi all,
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> I created a KIP to improve the
> > scalability and
> > > > > > latency
> > > > > > > > > of
> > > > > > > > > > > > >>>>>>>>> FetchRequest:
> > > > > > > > > > > > >>>>>>>>>>>> https://cwiki.apache.org/
> > > > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > > > >>>>>>>>> 227%3A+Introduce+Incremental+
> > > > > > FetchRequests+to+Increase+
> > > > > > > > > > > > >>>>>>>>> Partition+Scalability
> > > > > > > > > > > > >>>>>>>>>>>> Please take a look.
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> cheers,
> > > > > > > > > > > > >>>>>>>>>>>> Colin
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> >

Mime
View raw message