kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Becket Qin <becket....@gmail.com>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Tue, 12 Dec 2017 19:48:04 GMT
Hi Colin,

I am not completely sure, but I am hoping that when we do
FileChannel.transferTo() the OS will just use a fixed buffer to transfer
the data to the socket channel without polluting the page cache. But this
might not be true if we are using SSL.

The point I want to make is that avoiding doing binary search on index file
and avoid reading the log segments during fetch has some additional
benefits. So if the solution works for the current KIP, it might be a
better choice.

Regarding the fixed session for the entire life of the clients, it may be
also related to another issue we want to solve with broker epoch in
KAFKA-6029. If we can make sure the session id will not change along the
life time of clients, we can use that session id instead of creating a
separate broker epoch and add that to the FetchRequest.

Thanks,

Jiangjie (Becket) Qin



On Mon, Dec 11, 2017 at 3:25 PM, Colin McCabe <cmccabe@apache.org> wrote:

> On Mon, Dec 11, 2017, at 14:51, Becket Qin wrote:
> > Hi Jun,
> >
> > Yes, I agree avoiding reading the log segment is not the primary goal for
> > this KIP. I brought this up because recently I saw a significant
> > throughput
> > impact when a broker is down for 20 - 30 min and rejoins a cluster. The
> > bytes in rate could drop by 50% when that broker is trying to catch up
> > with
> > the leaders even in a big cluster (a single broker should not have such
> > big
> > impact on the entire cluster).
>
> Hi Becket,
>
> It sounds like the broker was fetching older data which wasn't in the
> page cache?  That sounds like it could definitely have a negative impact
> on the cluster.  It is a little troubling if the impact is a 50% drop in
> throughput, though.
>
> It's a little unclear how to mitigate this, since old data is definitely
> not going to be in memory.  Maybe we need to work on making sure that
> slow fetches going on by one fetcher do not slow down all the other
> worker threads...?
>
> > And some users also reported such cascading
> > degradation, i.e. when one consumer lags behind, the other consumers will
> > also start to lag behind. So I think addressing this is an important
> > improvement. I will run some test and see if returning at index boundary
> > to avoid the log scan would help address this issue. That being said, I
> > agree that we don't have to address this issue in this KIP. I can submit
> > another KIP later if avoiding the log segment scan helps.
>
> Thanks, that's really interesting.
>
> I agree that it might be better in a follow-on KIP.
>
> Is the goal to improve the cold-cache case?  Maybe avoid looking at the
> index file altogether (except for the initial setup)?  That would be a
> nice improvement for consumers fetching big sequential chunks of
> historic data.
>
> regards,
> Colin
>
>
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Dec 11, 2017 at 1:06 PM, Dong Lin <lindong28@gmail.com> wrote:
> >
> > > Hey Colin,
> > >
> > > I went over the latest KIP wiki and have a few comments here.
> > >
> > > 1) The KIP says that client ID is a string if the session belongs to a
> > > Kafka consumer. And it is a numerical follower Id if the session
> belongs to
> > > a follower. Can we have a consistent type for the client Id?
> > >
> > > 2) "The numeric follower ID, if this fetch session belongs to a Kafka
> > > broker". If the broker has multiple replica fetcher thread, do they all
> > > have the same follower Id in teh leader broker?
> > >
> > > 3) One of the condition for evicting an existing session is that "The
> new
> > > session belongs to a follower, and the existing session belongs to a
> > > regular consumer". I am not sure the session from follower should also
> be
> > > restricted by the newly added config. It seems that we will always
> create
> > > lots for FetchRequest from follower brokers. Maybe the
> > > "max.incremental.fetch.session.cache.slots" should only be applies if
> the
> > > FetchRequest comes from a client consumer?
> > >
> > > 4) Not sure I fully understand how the "The last dirty sequence
> number" is
> > > used. It is mentioned that "Let P1 have a last dirty sequence number of
> > > 100, and P2 have a last dirty sequence number of 101. An incremental
> fetch
> > > request with sequence number 100 will return information about both P1
> and
> > > P2." But would be the fetch offset for P2 in this case, if the last
> fetch
> > > offset stored in the Fetch Session for P2 is associated with the last
> dirty
> > > sequence number 101 for P2? My gut feel is that you would have to
> stored
> > > the fetch offset for sequence number 100 for P2 as well. Did I miss
> > > something here?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Sun, Dec 10, 2017 at 11:15 PM, Becket Qin <becket.qin@gmail.com>
> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > I see. Yes, that makes sense. Are we going to do that only for the
> > > fetches
> > > > whose per partition fetch size cannot reach the first index entry
> after
> > > the
> > > > fetch position, or are we going to do that for any fetch? If we do
> that
> > > for
> > > > any fetch, then we will still need to read the actual log segment,
> which
> > > > could be expensive if the data is no longer in the cache. This hurts
> > > > performance if some fetches are on the old log segments.
> > > >
> > > > I took a quick look on the clusters we have. The idle topic ratio
> varies
> > > > depending on the usage of the cluster. For our metric cluster and
> > > database
> > > > replication clusters almost all the topics are actively used. For
> > > tracking
> > > > clusters, ~70% topics have data coming in at different rate. For
> other
> > > > clusters such as queuing and data deployment. There are more idle
> topics
> > > > and the traffic is more bursty (I don't have the exact number here).
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Sun, Dec 10, 2017 at 10:17 PM, Colin McCabe <cmccabe@apache.org>
> > > wrote:
> > > >
> > > > > On Fri, Dec 8, 2017, at 16:56, Jun Rao wrote:
> > > > > > Hi, Jiangjie,
> > > > > >
> > > > > > What I described is almost the same as yours. The only extra
> thing is
> > > > to
> > > > > > scan the log segment from the identified index entry a bit more
> to
> > > > find a
> > > > > > file position that ends at a message set boundary and is less
> than
> > > the
> > > > > > partition level fetch size. This way, we still preserve the
> current
> > > > > > semantic of not returning more bytes than fetch size unless
> there is
> > > a
> > > > > > single message set larger than the fetch size.
> > > > > >
> > > > > > In a typically cluster at LinkedIn, what's the percentage of idle
> > > > > > partitions?
> > > > >
> > > > > Yeah, that would be a great number to get.
> > > > >
> > > > > Of course, KIP-227 will also benefit partitions that are not
> completely
> > > > > idle.  For instance, a partition that's getting just one message a
> > > > > second will appear in many fetch requests, unless every other
> partition
> > > > > in the system is also only getting a low rate of incoming messages.
> > > > >
> > > > > regards,
> > > > > Colin
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Wed, Dec 6, 2017 at 6:57 PM, Becket Qin <becket.qin@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Yes, we still need to handle the corner case. And you are
> right, it
> > > > is
> > > > > all
> > > > > > > about trade-off between simplicity and the performance gain.
> > > > > > >
> > > > > > > I was thinking that the brokers always return at least
> > > > > > > log.index.interval.bytes per partition to the consumer, just
> like
> > > we
> > > > > will
> > > > > > > return at least one message to the user. This way we don't
> need to
> > > > > worry
> > > > > > > about the case that the fetch size is smaller than the index
> > > > interval.
> > > > > We
> > > > > > > may just need to let users know this behavior change.
> > > > > > >
> > > > > > > Not sure if I completely understand your solution, but I think
> we
> > > are
> > > > > > > thinking about the same. i.e. for the first fetch asking for
> offset
> > > > > x0, we
> > > > > > > will need to do a binary search to find the position p0. and
> then
> > > the
> > > > > > > broker will iterate over the index entries starting from the
> first
> > > > > index
> > > > > > > entry whose offset is greater than p0 until it reaches the
> index
> > > > > entry(x1,
> > > > > > > p1) so that p1 - p0 is just under the fetch size, but the next
> > > entry
> > > > > will
> > > > > > > exceed the fetch size. We then return the bytes from p0 to p1.
> > > > > Meanwhile
> > > > > > > the broker caches the next fetch (x1, p1). So when the next
> fetch
> > > > > comes, it
> > > > > > > will just iterate over the offset index entry starting at (x1,
> p1).
> > > > > > >
> > > > > > > It is true that in the above approach, the log compacted topic
> > > needs
> > > > > to be
> > > > > > > handled. It seems that this can be solved by checking whether
> the
> > > > > cached
> > > > > > > index and the new log index are still the same index object. If
> > > they
> > > > > are
> > > > > > > not the same, we can fall back to binary search with the cached
> > > > > offset. It
> > > > > > > is admittedly more complicated than the current logic, but
> given
> > > the
> > > > > binary
> > > > > > > search logic already exists, it seems the additional object
> sanity
> > > > > check is
> > > > > > > not too much work.
> > > > > > >
> > > > > > > Not sure if the above implementation is simple enough to
> justify
> > > the
> > > > > > > performance improvement. Let me know if you see potential
> > > complexity.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 6, 2017 at 4:48 PM, Jun Rao <jun@confluent.io>
> wrote:
> > > > > > >
> > > > > > > > Hi, Becket,
> > > > > > > >
> > > > > > > > Yes, I agree that it's rare to have the fetch size smaller
> than
> > > > index
> > > > > > > > interval. It's just that we still need additional code to
> handle
> > > > the
> > > > > rare
> > > > > > > > case.
> > > > > > > >
> > > > > > > > If you go this far, a more general approach (i.e., without
> > > > returning
> > > > > at
> > > > > > > the
> > > > > > > > index boundary) is the following. We can cache the following
> > > > > metadata for
> > > > > > > > the next fetch offset: the file position in the log segment,
> the
> > > > > first
> > > > > > > > index slot at or after the file position. When serving a
> fetch
> > > > > request,
> > > > > > > we
> > > > > > > > scan the index entries from the cached index slot until we
> hit
> > > the
> > > > > fetch
> > > > > > > > size. We can then send the data at the message set boundary
> and
> > > > > update
> > > > > > > the
> > > > > > > > cached metadata for the next fetch offset. This is kind of
> > > > > complicated,
> > > > > > > but
> > > > > > > > probably not more than your approach if the corner case has
> to be
> > > > > > > handled.
> > > > > > > >
> > > > > > > > In both the above approach and your approach, we need the
> > > > additional
> > > > > > > logic
> > > > > > > > to handle compacted topic since a log segment (and therefore
> its
> > > > > index)
> > > > > > > can
> > > > > > > > be replaced between two consecutive fetch requests.
> > > > > > > >
> > > > > > > > Overall, I agree that the general approach that you proposed
> > > > applies
> > > > > more
> > > > > > > > widely since we get the benefit even when all topics are high
> > > > volume.
> > > > > > > It's
> > > > > > > > just that it would be better if we could think of a simpler
> > > > > > > implementation.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Tue, Dec 5, 2017 at 9:38 PM, Becket Qin <
> becket.qin@gmail.com
> > > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > That is true, but in reality it seems rare that the fetch
> size
> > > is
> > > > > > > smaller
> > > > > > > > > than index interval. In the worst case, we may need to do
> > > another
> > > > > look
> > > > > > > > up.
> > > > > > > > > In the future, when we have the mechanism to inform the
> clients
> > > > > about
> > > > > > > the
> > > > > > > > > broker configurations, the clients may want to configure
> > > > > > > correspondingly
> > > > > > > > as
> > > > > > > > > well, e.g. max message size, max timestamp difference, etc.
> > > > > > > > >
> > > > > > > > > On the other hand, we are not guaranteeing that the
> returned
> > > > bytes
> > > > > in a
> > > > > > > > > partition is always bounded by the per partition fetch
> size,
> > > > > because we
> > > > > > > > are
> > > > > > > > > going to return at least one message, so the per partition
> > > fetch
> > > > > size
> > > > > > > > seems
> > > > > > > > > already a soft limit. Since we are introducing a new fetch
> > > > > protocol and
> > > > > > > > > this is related, it might be worth considering this option.
> > > > > > > > >
> > > > > > > > > BTW, one reason I bring this up again was because
> yesterday we
> > > > had
> > > > > a
> > > > > > > > > presentation from Uber regarding the end to end latency.
> And
> > > they
> > > > > are
> > > > > > > > > seeing this binary search behavior impacting the latency
> due to
> > > > > page
> > > > > > > > in/out
> > > > > > > > > of the index file.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Dec 5, 2017 at 5:55 PM, Jun Rao <jun@confluent.io>
> > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Jiangjie,
> > > > > > > > > >
> > > > > > > > > > Not sure returning the fetch response at the index
> boundary
> > > is
> > > > a
> > > > > > > > general
> > > > > > > > > > solution. The index interval is configurable. If one
> > > configures
> > > > > the
> > > > > > > > index
> > > > > > > > > > interval larger than the per partition fetch size, we
> > > probably
> > > > > have
> > > > > > > to
> > > > > > > > > > return data not at the index boundary.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > > On Tue, Dec 5, 2017 at 4:17 PM, Becket Qin <
> > > > becket.qin@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Colin,
> > > > > > > > > > >
> > > > > > > > > > > Thinking about this again. I do see the reason that we
> want
> > > > to
> > > > > > > have a
> > > > > > > > > > epoch
> > > > > > > > > > > to avoid out of order registration of the interested
> set.
> > > But
> > > > > I am
> > > > > > > > > > > wondering if the following semantic would meet what we
> want
> > > > > better:
> > > > > > > > > > >  - Session Id: the id assigned to a single client for
> life
> > > > long
> > > > > > > time.
> > > > > > > > > i.e
> > > > > > > > > > > it does not change when the interested partitions
> change.
> > > > > > > > > > >  - Epoch: the interested set epoch. Only updated when a
> > > full
> > > > > fetch
> > > > > > > > > > request
> > > > > > > > > > > comes, which may result in the interested partition set
> > > > change.
> > > > > > > > > > > This will ensure that the registered interested set
> will
> > > > > always be
> > > > > > > > the
> > > > > > > > > > > latest registration. And the clients can change the
> > > > interested
> > > > > > > > > partition
> > > > > > > > > > > set without creating another session.
> > > > > > > > > > >
> > > > > > > > > > > Also I want to bring up the way the leader respond to
> the
> > > > > > > > FetchRequest
> > > > > > > > > > > again. I think it would be a big improvement if we just
> > > > return
> > > > > the
> > > > > > > > > > > responses at index entry boundary or log end. There
> are a
> > > few
> > > > > > > > benefits:
> > > > > > > > > > > 1. The leader does not need the follower to provide the
> > > > > offsets,
> > > > > > > > > > > 2. The fetch requests no longer need to do a binary
> search
> > > on
> > > > > the
> > > > > > > > > index,
> > > > > > > > > > it
> > > > > > > > > > > just need to do a linear access to the index file,
> which is
> > > > > much
> > > > > > > > cache
> > > > > > > > > > > friendly.
> > > > > > > > > > >
> > > > > > > > > > > Assuming the leader can get the last returned offsets
> to
> > > the
> > > > > > > clients
> > > > > > > > > > > cheaply, I am still not sure why it is necessary for
> the
> > > > > followers
> > > > > > > to
> > > > > > > > > > > repeat the offsets in the incremental fetch every time.
> > > > > Intuitively
> > > > > > > > it
> > > > > > > > > > > should only update the offsets when the leader has
> wrong
> > > > > offsets,
> > > > > > > in
> > > > > > > > > most
> > > > > > > > > > > cases, the incremental fetch request should just be
> empty.
> > > > > > > Otherwise
> > > > > > > > we
> > > > > > > > > > may
> > > > > > > > > > > not be saving much when there are continuous small
> requests
> > > > > going
> > > > > > > to
> > > > > > > > > each
> > > > > > > > > > > partition, which could be normal for some low latency
> > > > systems.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe <
> > > > > cmccabe@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > > > > > > > > > > Hi Colin
> > > > > > > > > > > > >
> > > > > > > > > > > > > Addressing the topic of how to manage slots from
> the
> > > > other
> > > > > > > > thread.
> > > > > > > > > > > > > With tcp connections all this comes for free
> > > essentially.
> > > > > > > > > > > >
> > > > > > > > > > > > Hi Jan,
> > > > > > > > > > > >
> > > > > > > > > > > > I don't think that it's accurate to say that cache
> > > > management
> > > > > > > > "comes
> > > > > > > > > > for
> > > > > > > > > > > > free" by coupling the incremental fetch session with
> the
> > > > TCP
> > > > > > > > session.
> > > > > > > > > > > > When a new TCP session is started by a fetch
> request, you
> > > > > still
> > > > > > > > have
> > > > > > > > > to
> > > > > > > > > > > > decide whether to grant that request an incremental
> fetch
> > > > > session
> > > > > > > > or
> > > > > > > > > > > > not.  If your answer is that you always grant the
> > > request,
> > > > I
> > > > > > > would
> > > > > > > > > > argue
> > > > > > > > > > > > that you do not have cache management.
> > > > > > > > > > > >
> > > > > > > > > > > > I guess you could argue that timeouts are cache
> > > management,
> > > > > but I
> > > > > > > > > don't
> > > > > > > > > > > > find that argument persuasive.  Anyone could just
> create
> > > a
> > > > > lot of
> > > > > > > > TCP
> > > > > > > > > > > > sessions and use a lot of resources, in that case.
> So
> > > > there
> > > > > is
> > > > > > > > > > > > essentially no limit on memory use.  In any case, TCP
> > > > > sessions
> > > > > > > > don't
> > > > > > > > > > > > help us implement fetch session timeouts.
> > > > > > > > > > > >
> > > > > > > > > > > > > I still would argue we disable it by default and
> make a
> > > > > flag in
> > > > > > > > the
> > > > > > > > > > > > > broker to ask the leader to maintain the cache
> while
> > > > > > > replicating
> > > > > > > > > and
> > > > > > > > > > > > also only
> > > > > > > > > > > > > have it optional in consumers (default to off) so
> one
> > > can
> > > > > turn
> > > > > > > it
> > > > > > > > > on
> > > > > > > > > > > > > where it really hurts.  MirrorMaker and audit
> consumers
> > > > > > > > > prominently.
> > > > > > > > > > > >
> > > > > > > > > > > > I agree with Jason's point from earlier in the
> thread.
> > > > > Adding
> > > > > > > > extra
> > > > > > > > > > > > configuration knobs that aren't really necessary can
> harm
> > > > > > > > usability.
> > > > > > > > > > > > Certainly asking people to manually turn on a feature
> > > > "where
> > > > > it
> > > > > > > > > really
> > > > > > > > > > > > hurts" seems to fall in that category, when we could
> > > easily
> > > > > > > enable
> > > > > > > > it
> > > > > > > > > > > > automatically for them.
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Otherwise I left a few remarks in-line, which
> should
> > > help
> > > > > to
> > > > > > > > > > understand
> > > > > > > > > > > > > my view of the situation better
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best Jan
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On 05.12.2017 08:06, Colin McCabe wrote:
> > > > > > > > > > > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak
> wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > > > > > > > > > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin
> wrote:
> > > > > > > > > > > > > >>>> Thanks for the explanation, Colin. A few more
> > > > > questions.
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>>> The session epoch is not complex.  It's just
> a
> > > > number
> > > > > > > which
> > > > > > > > > > > > increments
> > > > > > > > > > > > > >>>>> on each incremental fetch.  The session
> epoch is
> > > > also
> > > > > > > > useful
> > > > > > > > > > for
> > > > > > > > > > > > > >>>>> debugging-- it allows you to match up
> requests
> > > and
> > > > > > > > responses
> > > > > > > > > > when
> > > > > > > > > > > > > >>>>> looking at log files.
> > > > > > > > > > > > > >>>> Currently each request in Kafka has a
> correlation
> > > id
> > > > > to
> > > > > > > help
> > > > > > > > > > match
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>> requests and responses. Is epoch doing
> something
> > > > > > > > differently?
> > > > > > > > > > > > > >>> Hi Becket,
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> The correlation ID is used within a single TCP
> > > > > session, to
> > > > > > > > > > uniquely
> > > > > > > > > > > > > >>> associate a request with a response.  The
> > > correlation
> > > > > ID is
> > > > > > > > not
> > > > > > > > > > > > unique
> > > > > > > > > > > > > >>> (and has no meaning) outside the context of
> that
> > > > > single TCP
> > > > > > > > > > > session.
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> Keep in mind, NetworkClient is in charge of TCP
> > > > > sessions,
> > > > > > > and
> > > > > > > > > > > > generally
> > > > > > > > > > > > > >>> tries to hide that information from the upper
> > > layers
> > > > > of the
> > > > > > > > > code.
> > > > > > > > > > > So
> > > > > > > > > > > > > >>> when you submit a request to NetworkClient, you
> > > don't
> > > > > know
> > > > > > > if
> > > > > > > > > > that
> > > > > > > > > > > > > >>> request creates a TCP session, or reuses an
> > > existing
> > > > > one.
> > > > > > > > > > > > > >>>>> Unfortunately, this doesn't work.  Imagine
> the
> > > > client
> > > > > > > > misses
> > > > > > > > > an
> > > > > > > > > > > > > >>>>> increment fetch response about a partition.
> And
> > > > > then the
> > > > > > > > > > > > partition is
> > > > > > > > > > > > > >>>>> never updated after that.  The client has no
> way
> > > to
> > > > > know
> > > > > > > > > about
> > > > > > > > > > > the
> > > > > > > > > > > > > >>>>> partition, since it won't be included in any
> > > future
> > > > > > > > > incremental
> > > > > > > > > > > > fetch
> > > > > > > > > > > > > >>>>> responses.  And there are no offsets to
> compare,
> > > > > since
> > > > > > > the
> > > > > > > > > > > > partition is
> > > > > > > > > > > > > >>>>> simply omitted from the response.
> > > > > > > > > > > > > >>>> I am curious about in which situation would
> the
> > > > > follower
> > > > > > > > miss
> > > > > > > > > a
> > > > > > > > > > > > response
> > > > > > > > > > > > > >>>> of a partition. If the entire FetchResponse is
> > > lost
> > > > > (e.g.
> > > > > > > > > > > timeout),
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>> follower would disconnect and retry. That will
> > > > result
> > > > > in
> > > > > > > > > > sending a
> > > > > > > > > > > > full
> > > > > > > > > > > > > >>>> FetchRequest.
> > > > > > > > > > > > > >>> Basically, you are proposing that we rely on
> TCP
> > > for
> > > > > > > reliable
> > > > > > > > > > > > delivery
> > > > > > > > > > > > > >>> in a distributed system.  That isn't a good
> idea
> > > for
> > > > a
> > > > > > > bunch
> > > > > > > > of
> > > > > > > > > > > > > >>> different reasons.  First of all, TCP timeouts
> tend
> > > > to
> > > > > be
> > > > > > > > very
> > > > > > > > > > > > long.  So
> > > > > > > > > > > > > >>> if the TCP session timing out is your error
> > > detection
> > > > > > > > > mechanism,
> > > > > > > > > > > you
> > > > > > > > > > > > > >>> have to wait minutes for messages to timeout.
> Of
> > > > > course,
> > > > > > > we
> > > > > > > > > add
> > > > > > > > > > a
> > > > > > > > > > > > > >>> timeout on top of that after which we declare
> the
> > > > > > > connection
> > > > > > > > > bad
> > > > > > > > > > > and
> > > > > > > > > > > > > >>> manually close it.  But just because the
> session is
> > > > > closed
> > > > > > > on
> > > > > > > > > one
> > > > > > > > > > > end
> > > > > > > > > > > > > >>> doesn't mean that the other end knows that it
> is
> > > > > closed.
> > > > > > > So
> > > > > > > > > the
> > > > > > > > > > > > leader
> > > > > > > > > > > > > >>> may have to wait quite a long time before TCP
> > > decides
> > > > > that
> > > > > > > > yes,
> > > > > > > > > > > > > >>> connection X from the follower is dead and not
> > > coming
> > > > > back,
> > > > > > > > > even
> > > > > > > > > > > > though
> > > > > > > > > > > > > >>> gremlins ate the FIN packet which the follower
> > > > > attempted to
> > > > > > > > > > > > translate.
> > > > > > > > > > > > > >>> If the cache state is tied to that TCP
> session, we
> > > > > have to
> > > > > > > > keep
> > > > > > > > > > > that
> > > > > > > > > > > > > >>> cache around for a much longer time than we
> should.
> > > > > > > > > > > > > >> Hi,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> I see this from a different perspective. The
> cache
> > > > > expiry
> > > > > > > time
> > > > > > > > > > > > > >> has the same semantic as idle connection time in
> > > this
> > > > > > > > scenario.
> > > > > > > > > > > > > >> It is the time range we expect the client to
> come
> > > back
> > > > > an
> > > > > > > > reuse
> > > > > > > > > > > > > >> its broker side state. I would argue that on
> close
> > > we
> > > > > would
> > > > > > > > get
> > > > > > > > > an
> > > > > > > > > > > > > >> extra shot at cleaning up the session state
> early.
> > > As
> > > > > > > opposed
> > > > > > > > to
> > > > > > > > > > > > > >> always wait for that duration for expiry to
> happen.
> > > > > > > > > > > > > > Hi Jan,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > The idea here is that the incremental fetch cache
> > > > expiry
> > > > > time
> > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > > > > much shorter than the TCP session timeout.  In
> > > general
> > > > > the
> > > > > > > TCP
> > > > > > > > > > > session
> > > > > > > > > > > > > > timeout is common to all TCP connections, and
> very
> > > > > long.  To
> > > > > > > > make
> > > > > > > > > > > these
> > > > > > > > > > > > > > numbers a little more concrete, the TCP session
> > > timeout
> > > > > is
> > > > > > > > often
> > > > > > > > > > > > > > configured to be 2 hours on Linux.  (See
> > > > > > > > > > > > > > https://www.cyberciti.biz/
> tips/linux-increasing-or-
> > > > > > > > > > > > decreasing-tcp-sockets-timeouts.html
> > > > > > > > > > > > > > )  The timeout I was proposing for incremental
> fetch
> > > > > sessions
> > > > > > > > was
> > > > > > > > > > one
> > > > > > > > > > > > or
> > > > > > > > > > > > > > two minutes at most.
> > > > > > > > > > > > > Currently this is taken care of by
> > > > > > > > > > > > > connections.max.idle.ms on the broker and
> defaults to
> > > > > > > something
> > > > > > > > of
> > > > > > > > > > few
> > > > > > > > > > > > > minutes.
> > > > > > > > > > > >
> > > > > > > > > > > > It is 10 minutes by default, which is longer than
> what we
> > > > > want
> > > > > > > the
> > > > > > > > > > > > incremental fetch session timeout to be.  There's no
> > > reason
> > > > > to
> > > > > > > > couple
> > > > > > > > > > > > these two things.
> > > > > > > > > > > >
> > > > > > > > > > > > > Also something we could let the client change if we
> > > > really
> > > > > > > wanted
> > > > > > > > > to.
> > > > > > > > > > > > > So there is no need to worry about coupling our
> > > > > implementation
> > > > > > > to
> > > > > > > > > > some
> > > > > > > > > > > > > timeouts given by the OS, with TCP one always has
> full
> > > > > control
> > > > > > > > over
> > > > > > > > > > the
> > > > > > > > > > > > worst
> > > > > > > > > > > > > times + one gets the extra shot cleaning up early
> when
> > > > the
> > > > > > > close
> > > > > > > > > > comes
> > > > > > > > > > > > through.
> > > > > > > > > > > > > Which is the majority of the cases.
> > > > > > > > > > > >
> > > > > > > > > > > > In the majority of cases, the TCP session will be
> > > > > re-established.
> > > > > > > > In
> > > > > > > > > > > > that case, we have to send a full fetch request
> rather
> > > than
> > > > > an
> > > > > > > > > > > > incremental fetch request.
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >>> Secondly, from a software engineering
> perspective,
> > > > it's
> > > > > > > not a
> > > > > > > > > > good
> > > > > > > > > > > > idea
> > > > > > > > > > > > > >>> to try to tightly tie together TCP and our
> code.
> > > We
> > > > > would
> > > > > > > > have
> > > > > > > > > > to
> > > > > > > > > > > > > >>> rework how we interact with NetworkClient so
> that
> > > we
> > > > > are
> > > > > > > > aware
> > > > > > > > > of
> > > > > > > > > > > > things
> > > > > > > > > > > > > >>> like TCP sessions closing or opening.  We would
> > > have
> > > > > to be
> > > > > > > > > > careful
> > > > > > > > > > > > > >>> preserve the ordering of incoming messages when
> > > doing
> > > > > > > things
> > > > > > > > > like
> > > > > > > > > > > > > >>> putting incoming requests on to a queue to be
> > > > > processed by
> > > > > > > > > > multiple
> > > > > > > > > > > > > >>> threads.  It's just a lot of complexity to
> add, and
> > > > > there's
> > > > > > > > no
> > > > > > > > > > > > upside.
> > > > > > > > > > > > > >> I see the point here. And I had a small chat
> with
> > > Dong
> > > > > Lin
> > > > > > > > > already
> > > > > > > > > > > > > >> making me aware of this. I tried out the
> approaches
> > > > and
> > > > > > > > propose
> > > > > > > > > > the
> > > > > > > > > > > > > >> following:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> The client start and does a full fetch. It then
> does
> > > > > > > > incremental
> > > > > > > > > > > > fetches.
> > > > > > > > > > > > > >> The connection to the broker dies and is
> > > > re-established
> > > > > by
> > > > > > > > > > > > NetworkClient
> > > > > > > > > > > > > >> under the hood.
> > > > > > > > > > > > > >> The broker sees an incremental fetch without
> having
> > > > > state =>
> > > > > > > > > > returns
> > > > > > > > > > > > > >> error:
> > > > > > > > > > > > > >> Client sees the error, does a full fetch and
> goes
> > > back
> > > > > to
> > > > > > > > > > > > incrementally
> > > > > > > > > > > > > >> fetching.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> having this 1 additional error round trip is
> > > > > essentially the
> > > > > > > > > same
> > > > > > > > > > as
> > > > > > > > > > > > > >> when something
> > > > > > > > > > > > > >> with the sessions or epoch changed unexpectedly
> to
> > > the
> > > > > > > client
> > > > > > > > > (say
> > > > > > > > > > > > > >> expiry).
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> So its nothing extra added but the conditions
> are
> > > > > easier to
> > > > > > > > > > > evaluate.
> > > > > > > > > > > > > >> Especially since we do everything with
> > > NetworkClient.
> > > > > Other
> > > > > > > > > > > > implementers
> > > > > > > > > > > > > >> on the
> > > > > > > > > > > > > >> protocol are free to optimizes this and do not
> do
> > > the
> > > > > > > > errornours
> > > > > > > > > > > > > >> roundtrip on the
> > > > > > > > > > > > > >> new connection.
> > > > > > > > > > > > > >> Its a great plus that the client can know when
> the
> > > > > error is
> > > > > > > > > gonna
> > > > > > > > > > > > > >> happen. instead of
> > > > > > > > > > > > > >> the server to always have to report back if
> > > something
> > > > > > > changes
> > > > > > > > > > > > > >> unexpectedly for the client
> > > > > > > > > > > > > > You are assuming that the leader and the follower
> > > agree
> > > > > that
> > > > > > > > the
> > > > > > > > > > TCP
> > > > > > > > > > > > > > session drops at the same time.  When there are
> > > network
> > > > > > > > problems,
> > > > > > > > > > > this
> > > > > > > > > > > > > > may not be true.  The leader may still think the
> > > > > previous TCP
> > > > > > > > > > session
> > > > > > > > > > > > is
> > > > > > > > > > > > > > active.  In that case, we have to keep the
> > > incremental
> > > > > fetch
> > > > > > > > > > session
> > > > > > > > > > > > > > state around until we learn otherwise (which
> could be
> > > > up
> > > > > to
> > > > > > > > that
> > > > > > > > > 2
> > > > > > > > > > > hour
> > > > > > > > > > > > > > timeout I mentioned).  And if we get a new
> incoming
> > > > > > > incremental
> > > > > > > > > > fetch
> > > > > > > > > > > > > > request, we can't assume that it replaces the
> > > previous
> > > > > one,
> > > > > > > > > because
> > > > > > > > > > > the
> > > > > > > > > > > > > > IDs will be different (the new one starts a new
> > > > session).
> > > > > > > > > > > > > As mentioned, no reason to fear some time-outs out
> of
> > > our
> > > > > > > control
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >>> Imagine that I made an argument that client
> IDs are
> > > > > > > "complex"
> > > > > > > > > and
> > > > > > > > > > > > should
> > > > > > > > > > > > > >>> be removed from our APIs.  After all, we can
> just
> > > > look
> > > > > at
> > > > > > > the
> > > > > > > > > > > remote
> > > > > > > > > > > > IP
> > > > > > > > > > > > > >>> address and TCP port of each connection.
> Would you
> > > > > think
> > > > > > > > that
> > > > > > > > > > was
> > > > > > > > > > > a
> > > > > > > > > > > > > >>> good idea?  The client ID is useful when
> looking at
> > > > > logs.
> > > > > > > > For
> > > > > > > > > > > > example,
> > > > > > > > > > > > > >>> if a rebalance is having problems, you want to
> know
> > > > > what
> > > > > > > > > clients
> > > > > > > > > > > were
> > > > > > > > > > > > > >>> having a problem.  So having the client ID
> field to
> > > > > guide
> > > > > > > you
> > > > > > > > > is
> > > > > > > > > > > > > >>> actually much less "complex" in practice than
> not
> > > > > having an
> > > > > > > > ID.
> > > > > > > > > > > > > >> I still cant follow why the correlation idea
> will
> > > not
> > > > > help
> > > > > > > > here.
> > > > > > > > > > > > > >> Correlating logs with it usually works great.
> Even
> > > > with
> > > > > > > > > primitive
> > > > > > > > > > > > tools
> > > > > > > > > > > > > >> like grep
> > > > > > > > > > > > > > The correlation ID does help somewhat, but
> certainly
> > > > not
> > > > > as
> > > > > > > > much
> > > > > > > > > > as a
> > > > > > > > > > > > > > unique 64-bit ID.  The correlation ID is not
> unique
> > > in
> > > > > the
> > > > > > > > > broker,
> > > > > > > > > > > just
> > > > > > > > > > > > > > unique to a single NetworkClient.  Simiarly, the
> > > > > correlation
> > > > > > > ID
> > > > > > > > > is
> > > > > > > > > > > not
> > > > > > > > > > > > > > unique on the client side, if there are multiple
> > > > > Consumers,
> > > > > > > > etc.
> > > > > > > > > > > > > Can always bump entropy in correlation IDs, never
> had a
> > > > > problem
> > > > > > > > > > > > > of finding to many duplicates. Would be a
> different KIP
> > > > > though.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >>> Similarly, if metadata responses had epoch
> numbers
> > > > > (simple
> > > > > > > > > > > > incrementing
> > > > > > > > > > > > > >>> numbers), we would not have to debug problems
> like
> > > > > clients
> > > > > > > > > > > > accidentally
> > > > > > > > > > > > > >>> getting old metadata from servers that had been
> > > > > partitioned
> > > > > > > > off
> > > > > > > > > > > from
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>> network for a while.  Clients would know the
> > > > difference
> > > > > > > > between
> > > > > > > > > > old
> > > > > > > > > > > > and
> > > > > > > > > > > > > >>> new metadata.  So putting epochs in to the
> metadata
> > > > > request
> > > > > > > > is
> > > > > > > > > > much
> > > > > > > > > > > > less
> > > > > > > > > > > > > >>> "complex" operationally, even though it's an
> extra
> > > > > field in
> > > > > > > > the
> > > > > > > > > > > > request.
> > > > > > > > > > > > > >>>    This has been discussed before on the
> mailing
> > > > list.
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> So I think the bottom line for me is that
> having
> > > the
> > > > > > > session
> > > > > > > > ID
> > > > > > > > > > and
> > > > > > > > > > > > > >>> session epoch, while it adds two extra fields,
> > > > reduces
> > > > > > > > > > operational
> > > > > > > > > > > > > >>> complexity and increases debuggability.  It
> avoids
> > > > > tightly
> > > > > > > > > > coupling
> > > > > > > > > > > > us
> > > > > > > > > > > > > >>> to assumptions about reliable ordered delivery
> > > which
> > > > > tend
> > > > > > > to
> > > > > > > > be
> > > > > > > > > > > > violated
> > > > > > > > > > > > > >>> in practice in multiple layers of the stack.
> > > > Finally,
> > > > > it
> > > > > > > > > avoids
> > > > > > > > > > > the
> > > > > > > > > > > > > >>> necessity of refactoring NetworkClient.
> > > > > > > > > > > > > >> So there is stacks out there that violate TCP
> > > > > guarantees?
> > > > > > > And
> > > > > > > > > > > software
> > > > > > > > > > > > > >> still works? How can this be? Can you elaborate
> a
> > > > little
> > > > > > > where
> > > > > > > > > > this
> > > > > > > > > > > > > >> can be violated? I am not very familiar with
> > > > virtualized
> > > > > > > > > > > environments
> > > > > > > > > > > > > >> but they can't really violate TCP contracts.
> > > > > > > > > > > > > > TCP's guarantees of reliable, in-order
> transmission
> > > > > certainly
> > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > > > > violated.  For example, I once had to debug a
> cluster
> > > > > where a
> > > > > > > > > > certain
> > > > > > > > > > > > > > node had a network card which corrupted its
> > > > transmissions
> > > > > > > > > > > occasionally.
> > > > > > > > > > > > > > With all the layers of checksums, you would think
> > > that
> > > > > this
> > > > > > > was
> > > > > > > > > not
> > > > > > > > > > > > > > possible, but it happened.  We occasionally got
> > > > corrupted
> > > > > > > data
> > > > > > > > > > > written
> > > > > > > > > > > > > > to disk on the other end because of it.  Even
> more
> > > > > > > frustrating,
> > > > > > > > > the
> > > > > > > > > > > > data
> > > > > > > > > > > > > > was not corrupted on disk on the sending node--
> it
> > > was
> > > > a
> > > > > bug
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > > > > network card driver that was injecting the
> errors.
> > > > > > > > > > > > > true, but your broker might aswell read a corrupted
> > > 600GB
> > > > > as
> > > > > > > size
> > > > > > > > > > from
> > > > > > > > > > > > > the network and die with OOM instantly.
> > > > > > > > > > > >
> > > > > > > > > > > > If you read 600 GB as the size from the network, you
> will
> > > > not
> > > > > > > "die
> > > > > > > > > with
> > > > > > > > > > > > OOM instantly."  That would be a bug.  Instead, you
> will
> > > > > notice
> > > > > > > > that
> > > > > > > > > > 600
> > > > > > > > > > > > GB is greater than max.message.bytes, and close the
> > > > > connection.
> > > > > > > > > > > >
> > > > > > > > > > > > > Optimizing for still having functional
> > > > > > > > > > > > > software under this circumstances is not
> reasonable.
> > > > > > > > > > > > > You want to get rid of such a
> > > > > > > > > > > > > node ASAP and pray that zookeepers ticks get
> corrupted
> > > > > often
> > > > > > > > enough
> > > > > > > > > > > > > that it finally drops out of the cluster.
> > > > > > > > > > > > >
> > > > > > > > > > > > > There is a good reason that these kinda things
> > > > > > > > > > > > > https://issues.apache.org/jira/browse/MESOS-4105
> > > > > > > > > > > > > don't end up as kafka Jiras. In the end you can't
> run
> > > any
> > > > > > > > software
> > > > > > > > > in
> > > > > > > > > > > > > these containers anymore. Application layer
> checksums
> > > > are a
> > > > > > > neat
> > > > > > > > > > thing
> > > > > > > > > > > to
> > > > > > > > > > > > > fail fast but trying to cope with this probably
> causes
> > > > > more bad
> > > > > > > > > than
> > > > > > > > > > > > > good.  So I would argue that we shouldn't try this
> for
> > > > the
> > > > > > > fetch
> > > > > > > > > > > > requests.
> > > > > > > > > > > >
> > > > > > > > > > > > One of the goals of Apache Kafka is to be "a
> streaming
> > > > > > > platform...
> > > > > > > > > > > > [that] lets you store streams of records in a
> > > > fault-tolerant
> > > > > > > way."
> > > > > > > > > For
> > > > > > > > > > > > more information, see https://kafka.apache.org/intro
> .
> > > > > > > > > > Fault-tolerance
> > > > > > > > > > > > is explicitly part of the goal of Kafka.  Prayer
> should
> > > be
> > > > > > > > optional,
> > > > > > > > > > not
> > > > > > > > > > > > required, when running the software.
> > > > > > > > > > > >
> > > > > > > > > > > > Crashing because someone sent you a bad packet is not
> > > > > reasonable
> > > > > > > > > > > > behavior.  It is a bug.  Similarly, bringing down the
> > > whole
> > > > > > > > cluster,
> > > > > > > > > > > > which could a hundred nodes, because someone had a
> bad
> > > > > network
> > > > > > > > > adapter
> > > > > > > > > > > > is not reasonable behavior.  It is perhaps
> reasonable for
> > > > the
> > > > > > > > cluster
> > > > > > > > > > to
> > > > > > > > > > > > perform worse when hardware is having problems.  But
> > > > that's a
> > > > > > > > > different
> > > > > > > > > > > > discussion.
> > > > > > > > > > > >
> > > > > > > > > > > > best,
> > > > > > > > > > > > Colin
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > However, my point was not about TCP's guarantees
> > > being
> > > > > > > > violated.
> > > > > > > > > > My
> > > > > > > > > > > > > > point is that TCP's guarantees are only one small
> > > > > building
> > > > > > > > block
> > > > > > > > > to
> > > > > > > > > > > > > > build a robust distributed system.  TCP basically
> > > just
> > > > > says
> > > > > > > > that
> > > > > > > > > if
> > > > > > > > > > > you
> > > > > > > > > > > > > > get any bytes from the stream, you will get the
> ones
> > > > that
> > > > > > > were
> > > > > > > > > sent
> > > > > > > > > > > by
> > > > > > > > > > > > > > the sender, in the order they were sent.  TCP
> does
> > > not
> > > > > > > > guarantee
> > > > > > > > > > that
> > > > > > > > > > > > > > the bytes you send will get there.  It does not
> > > > guarantee
> > > > > > > that
> > > > > > > > if
> > > > > > > > > > you
> > > > > > > > > > > > > > close the connection, the other end will know
> about
> > > it
> > > > > in a
> > > > > > > > > timely
> > > > > > > > > > > > > > fashion.
> > > > > > > > > > > > > These are very powerful grantees and since we use
> TCP
> > > we
> > > > > should
> > > > > > > > > > > > > piggy pack everything that is reasonable on to it.
> IMO
> > > > > there is
> > > > > > > > no
> > > > > > > > > > > > > need to reimplement correct sequencing again if
> you get
> > > > > that
> > > > > > > from
> > > > > > > > > > > > > your transport layer. It saves you the complexity,
> it
> > > > makes
> > > > > > > > > > > > > you application behave way more naturally and your
> api
> > > > > easier
> > > > > > > to
> > > > > > > > > > > > > understand.
> > > > > > > > > > > > >
> > > > > > > > > > > > > There is literally nothing the Kernel wont let you
> > > decide
> > > > > > > > > > > > > especially not any timings. Only noticeable
> exception
> > > > being
> > > > > > > > > TIME_WAIT
> > > > > > > > > > > > > of usually 240 seconds but that already has little
> todo
> > > > > with
> > > > > > > the
> > > > > > > > > > broker
> > > > > > > > > > > > > itself and
> > > > > > > > > > > > > if we are running out of usable ports because of
> this
> > > > then
> > > > > > > > expiring
> > > > > > > > > > > > > fetch requests
> > > > > > > > > > > > > wont help much anyways.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I hope I could strengthen the trust you have in
> > > userland
> > > > > TCP
> > > > > > > > > > connection
> > > > > > > > > > > > > management. It is really powerful and can be
> exploited
> > > > for
> > > > > > > > maximum
> > > > > > > > > > > gains
> > > > > > > > > > > > > without much risk in my opinion.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > > It does not guarantee that the bytes will be
> received
> > > > in
> > > > > a
> > > > > > > > > > > > > > certain timeframe, and certainly doesn't
> guarantee
> > > that
> > > > > if
> > > > > > > you
> > > > > > > > > > send a
> > > > > > > > > > > > > > byte on connection X and then on connection Y,
> that
> > > the
> > > > > > > remote
> > > > > > > > > end
> > > > > > > > > > > will
> > > > > > > > > > > > > > read a byte on X before reading a byte on Y.
> > > > > > > > > > > > > Noone expects this from two independent paths of
> any
> > > > kind.
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > best,
> > > > > > > > > > > > > > Colin
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> Hope this made my view clearer, especially the
> first
> > > > > part.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Best Jan
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>> best,
> > > > > > > > > > > > > >>> Colin
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>>> If there is an error such as
> NotLeaderForPartition
> > > > is
> > > > > > > > > > > > > >>>> returned for some partitions, the follower can
> > > > always
> > > > > > > send a
> > > > > > > > > > full
> > > > > > > > > > > > > >>>> FetchRequest. Is there a scenario that only
> some
> > > of
> > > > > the
> > > > > > > > > > partitions
> > > > > > > > > > > > in a
> > > > > > > > > > > > > >>>> FetchResponse is lost?
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> Thanks,
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> Jiangjie (Becket) Qin
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> > > > > > > > > cmccabe@apache.org
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin
> wrote:
> > > > > > > > > > > > > >>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin
> McCabe<
> > > > > > > > > > > cmccabe@apache.org>
> > > > > > > > > > > > > >>>>> wrote:
> > > > > > > > > > > > > >>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin
> wrote:
> > > > > > > > > > > > > >>>>>>>> Hey Colin,
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>> Thanks much for the update. I have a few
> > > > questions
> > > > > > > > below:
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>> 1. I am not very sure that we need Fetch
> > > Session
> > > > > > > Epoch.
> > > > > > > > It
> > > > > > > > > > > > seems that
> > > > > > > > > > > > > >>>>>>>> Fetch
> > > > > > > > > > > > > >>>>>>>> Session Epoch is only needed to help
> leader
> > > > > > > distinguish
> > > > > > > > > > > between
> > > > > > > > > > > > "a
> > > > > > > > > > > > > >>>>> full
> > > > > > > > > > > > > >>>>>>>> fetch request" and "a full fetch request
> and
> > > > > request a
> > > > > > > > new
> > > > > > > > > > > > > >>>>> incremental
> > > > > > > > > > > > > >>>>>>>> fetch session". Alternatively, follower
> can
> > > also
> > > > > > > > indicate
> > > > > > > > > "a
> > > > > > > > > > > > full
> > > > > > > > > > > > > >>>>> fetch
> > > > > > > > > > > > > >>>>>>>> request and request a new incremental
> fetch
> > > > > session"
> > > > > > > by
> > > > > > > > > > > setting
> > > > > > > > > > > > Fetch
> > > > > > > > > > > > > >>>>>>>> Session ID to -1 without using Fetch
> Session
> > > > > Epoch.
> > > > > > > Does
> > > > > > > > > > this
> > > > > > > > > > > > make
> > > > > > > > > > > > > >>>>> sense?
> > > > > > > > > > > > > >>>>>>> Hi Dong,
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> The fetch session epoch is very important
> for
> > > > > ensuring
> > > > > > > > > > > > correctness.  It
> > > > > > > > > > > > > >>>>>>> prevents corrupted or incomplete fetch
> data due
> > > > to
> > > > > > > > network
> > > > > > > > > > > > reordering
> > > > > > > > > > > > > >>>>> or
> > > > > > > > > > > > > >>>>>>> loss.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> For example, consider a scenario where the
> > > > follower
> > > > > > > > sends a
> > > > > > > > > > > fetch
> > > > > > > > > > > > > >>>>>>> request to the leader.  The leader
> responds,
> > > but
> > > > > the
> > > > > > > > > response
> > > > > > > > > > > is
> > > > > > > > > > > > lost
> > > > > > > > > > > > > >>>>>>> because of network problems which affected
> the
> > > > TCP
> > > > > > > > session.
> > > > > > > > > > In
> > > > > > > > > > > > that
> > > > > > > > > > > > > >>>>>>> case, the follower must establish a new TCP
> > > > > session and
> > > > > > > > > > re-send
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>> incremental fetch request.  But the leader
> does
> > > > not
> > > > > > > know
> > > > > > > > > that
> > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>> follower didn't receive the previous
> > > incremental
> > > > > fetch
> > > > > > > > > > > > response.  It is
> > > > > > > > > > > > > >>>>>>> only the incremental fetch epoch which
> lets the
> > > > > leader
> > > > > > > > know
> > > > > > > > > > > that
> > > > > > > > > > > > it
> > > > > > > > > > > > > >>>>>>> needs to resend that data, and not data
> which
> > > > comes
> > > > > > > > > > afterwards.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> You could construct similar scenarios with
> > > > message
> > > > > > > > > > reordering,
> > > > > > > > > > > > > >>>>>>> duplication, etc.  Basically, this is a
> > > stateful
> > > > > > > protocol
> > > > > > > > > on
> > > > > > > > > > an
> > > > > > > > > > > > > >>>>>>> unreliable network, and you need to know
> > > whether
> > > > > the
> > > > > > > > > follower
> > > > > > > > > > > > got the
> > > > > > > > > > > > > >>>>>>> previous data you sent before you move
> on.  And
> > > > you
> > > > > > > need
> > > > > > > > to
> > > > > > > > > > > > handle
> > > > > > > > > > > > > >>>>>>> issues like duplicated or delayed requests.
> > > > These
> > > > > > > issues
> > > > > > > > > do
> > > > > > > > > > > not
> > > > > > > > > > > > affect
> > > > > > > > > > > > > >>>>>>> the full fetch request, because it is not
> > > > > stateful--
> > > > > > > any
> > > > > > > > > full
> > > > > > > > > > > > fetch
> > > > > > > > > > > > > >>>>>>> request can be understood and properly
> > > responded
> > > > > to in
> > > > > > > > > > > isolation.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>> Thanks for the explanation. This makes
> sense. On
> > > > the
> > > > > > > other
> > > > > > > > > > hand
> > > > > > > > > > > I
> > > > > > > > > > > > would
> > > > > > > > > > > > > >>>>>> be interested in learning more about whether
> > > > > Becket's
> > > > > > > > > solution
> > > > > > > > > > > > can help
> > > > > > > > > > > > > >>>>>> simplify the protocol by not having the echo
> > > field
> > > > > and
> > > > > > > > > whether
> > > > > > > > > > > > that is
> > > > > > > > > > > > > >>>>>> worth doing.
> > > > > > > > > > > > > >>>>> Hi Dong,
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> I commented about this in the other thread.
> A
> > > > > solution
> > > > > > > > which
> > > > > > > > > > > > doesn't
> > > > > > > > > > > > > >>>>> maintain session information doesn't work
> here.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>>>> 2. It is said that Incremental
> FetchRequest
> > > will
> > > > > > > include
> > > > > > > > > > > > partitions
> > > > > > > > > > > > > >>>>> whose
> > > > > > > > > > > > > >>>>>>>> fetch offset or maximum number of fetch
> bytes
> > > > has
> > > > > been
> > > > > > > > > > > changed.
> > > > > > > > > > > > If
> > > > > > > > > > > > > >>>>>>>> follower's logStartOffet of a partition
> has
> > > > > changed,
> > > > > > > > > should
> > > > > > > > > > > this
> > > > > > > > > > > > > >>>>>>>> partition also be included in the next
> > > > > FetchRequest to
> > > > > > > > the
> > > > > > > > > > > > leader?
> > > > > > > > > > > > > >>>>>>> Otherwise, it
> > > > > > > > > > > > > >>>>>>>> may affect the handling of
> > > DeleteRecordsRequest
> > > > > > > because
> > > > > > > > > > leader
> > > > > > > > > > > > may
> > > > > > > > > > > > > >>>>> not
> > > > > > > > > > > > > >>>>>>> know
> > > > > > > > > > > > > >>>>>>>> the corresponding data has been deleted
> on the
> > > > > > > follower.
> > > > > > > > > > > > > >>>>>>> Yeah, the follower should include the
> partition
> > > > if
> > > > > the
> > > > > > > > > > > > logStartOffset
> > > > > > > > > > > > > >>>>>>> has changed.  That should be spelled out
> on the
> > > > > KIP.
> > > > > > > > > Fixed.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>>> 3. In the section "Per-Partition Data", a
> > > > > partition is
> > > > > > > > not
> > > > > > > > > > > > considered
> > > > > > > > > > > > > >>>>>>>> dirty if its log start offset has changed.
> > > Later
> > > > > in
> > > > > > > the
> > > > > > > > > > > section
> > > > > > > > > > > > > >>>>>>> "FetchRequest
> > > > > > > > > > > > > >>>>>>>> Changes", it is said that incremental
> fetch
> > > > > responses
> > > > > > > > will
> > > > > > > > > > > > include a
> > > > > > > > > > > > > >>>>>>>> partition if its logStartOffset has
> changed.
> > > It
> > > > > seems
> > > > > > > > > > > > inconsistent.
> > > > > > > > > > > > > >>>>> Can
> > > > > > > > > > > > > >>>>>>>> you update the KIP to clarify it?
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>> In the "Per-Partition Data" section, it
> does
> > > say
> > > > > that
> > > > > > > > > > > > logStartOffset
> > > > > > > > > > > > > >>>>>>> changes make a partition dirty, though,
> right?
> > > > The
> > > > > > > first
> > > > > > > > > > > bullet
> > > > > > > > > > > > point
> > > > > > > > > > > > > >>>>>>> is:
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>>> * The LogCleaner deletes messages, and
> this
> > > > > changes
> > > > > > > the
> > > > > > > > > log
> > > > > > > > > > > > start
> > > > > > > > > > > > > >>>>> offset
> > > > > > > > > > > > > >>>>>>> of the partition on the leader., or
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>> Ah I see. I think I didn't notice this
> because
> > > > > statement
> > > > > > > > > > assumes
> > > > > > > > > > > > that the
> > > > > > > > > > > > > >>>>>> LogStartOffset in the leader only changes
> due to
> > > > > > > > LogCleaner.
> > > > > > > > > > In
> > > > > > > > > > > > fact the
> > > > > > > > > > > > > >>>>>> LogStartOffset can change on the leader due
> to
> > > > > either
> > > > > > > log
> > > > > > > > > > > > retention and
> > > > > > > > > > > > > >>>>>> DeleteRecordsRequest. I haven't verified
> whether
> > > > > > > > LogCleaner
> > > > > > > > > > can
> > > > > > > > > > > > change
> > > > > > > > > > > > > >>>>>> LogStartOffset though. It may be a bit
> better to
> > > > > just
> > > > > > > say
> > > > > > > > > > that a
> > > > > > > > > > > > > >>>>>> partition is considered dirty if
> LogStartOffset
> > > > > changes.
> > > > > > > > > > > > > >>>>> I agree.  It should be straightforward to
> just
> > > > > resend the
> > > > > > > > > > > > partition if
> > > > > > > > > > > > > >>>>> logStartOffset changes.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>>>> 4. In "Fetch Session Caching" section, it
> is
> > > > said
> > > > > that
> > > > > > > > > each
> > > > > > > > > > > > broker
> > > > > > > > > > > > > >>>>> has a
> > > > > > > > > > > > > >>>>>>>> limited number of slots. How is this
> number
> > > > > > > determined?
> > > > > > > > > Does
> > > > > > > > > > > > this
> > > > > > > > > > > > > >>>>> require
> > > > > > > > > > > > > >>>>>>>> a new broker config for this number?
> > > > > > > > > > > > > >>>>>>> Good point.  I added two broker
> configuration
> > > > > > > parameters
> > > > > > > > to
> > > > > > > > > > > > control
> > > > > > > > > > > > > >>>>> this
> > > > > > > > > > > > > >>>>>>> number.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>> I am curious to see whether we can avoid
> some of
> > > > > these
> > > > > > > new
> > > > > > > > > > > > configs. For
> > > > > > > > > > > > > >>>>>> example, incremental.fetch.session.
> > > > > > > cache.slots.per.broker
> > > > > > > > > is
> > > > > > > > > > > > probably
> > > > > > > > > > > > > >>>>> not
> > > > > > > > > > > > > >>>>>> necessary because if a leader knows that a
> > > > > FetchRequest
> > > > > > > > > comes
> > > > > > > > > > > > from a
> > > > > > > > > > > > > >>>>>> follower, we probably want the leader to
> always
> > > > > cache
> > > > > > > the
> > > > > > > > > > > > information
> > > > > > > > > > > > > >>>>>> from that follower. Does this make sense?
> > > > > > > > > > > > > >>>>> Yeah, maybe we can avoid having
> > > > > > > > > > > > > >>>>> incremental.fetch.session.
> > > cache.slots.per.broker.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>> Maybe we can discuss the config later after
> > > there
> > > > is
> > > > > > > > > agreement
> > > > > > > > > > > on
> > > > > > > > > > > > how the
> > > > > > > > > > > > > >>>>>> protocol would look like.
> > > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > > >>>>>>>> What is the error code if broker does
> > > > > > > > > > > > > >>>>>>>> not have new log for the incoming
> > > FetchRequest?
> > > > > > > > > > > > > >>>>>>> Hmm, is there a typo in this question?
> Maybe
> > > you
> > > > > meant
> > > > > > > > to
> > > > > > > > > > ask
> > > > > > > > > > > > what
> > > > > > > > > > > > > >>>>>>> happens if there is no new cache slot for
> the
> > > > > incoming
> > > > > > > > > > > > FetchRequest?
> > > > > > > > > > > > > >>>>>>> That's not an error-- the incremental fetch
> > > > > session ID
> > > > > > > > just
> > > > > > > > > > > gets
> > > > > > > > > > > > set to
> > > > > > > > > > > > > >>>>>>> 0, indicating no incremental fetch session
> was
> > > > > created.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>> Yeah there is a typo. You have answered my
> > > > question.
> > > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > > >>>>>>>> 5. Can you clarify what happens if
> follower
> > > > adds a
> > > > > > > > > partition
> > > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>>> ReplicaFetcherThread after receiving
> > > > > > > > LeaderAndIsrRequest?
> > > > > > > > > > Does
> > > > > > > > > > > > leader
> > > > > > > > > > > > > >>>>>>>> needs to generate a new session for this
> > > > > > > > > > ReplicaFetcherThread
> > > > > > > > > > > or
> > > > > > > > > > > > > >>>>> does it
> > > > > > > > > > > > > >>>>>>> re-use
> > > > > > > > > > > > > >>>>>>>> the existing session?  If it uses a new
> > > session,
> > > > > is
> > > > > > > the
> > > > > > > > > old
> > > > > > > > > > > > session
> > > > > > > > > > > > > >>>>>>>> actively deleted from the slot?
> > > > > > > > > > > > > >>>>>>> The basic idea is that you can't make
> changes,
> > > > > except
> > > > > > > by
> > > > > > > > > > > sending
> > > > > > > > > > > > a full
> > > > > > > > > > > > > >>>>>>> fetch request.  However, perhaps we can
> allow
> > > the
> > > > > > > client
> > > > > > > > to
> > > > > > > > > > > > re-use its
> > > > > > > > > > > > > >>>>>>> existing session ID.  If the client sets
> > > > sessionId
> > > > > =
> > > > > > > id,
> > > > > > > > > > epoch
> > > > > > > > > > > =
> > > > > > > > > > > > 0, it
> > > > > > > > > > > > > >>>>>>> could re-initialize the session.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>> Yeah I agree with the basic idea. We
> probably
> > > want
> > > > > to
> > > > > > > > > > understand
> > > > > > > > > > > > more
> > > > > > > > > > > > > >>>>>> detail about how this works later.
> > > > > > > > > > > > > >>>>> Sounds good.  I updated the KIP with this
> > > > > information.  A
> > > > > > > > > > > > > >>>>> re-initialization should be exactly the same
> as
> > > an
> > > > > > > > > > > initialization,
> > > > > > > > > > > > > >>>>> except that it reuses an existing ID.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> best,
> > > > > > > > > > > > > >>>>> Colin
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>>>> BTW, I think it may be useful if the KIP
> can
> > > > > include
> > > > > > > the
> > > > > > > > > > > example
> > > > > > > > > > > > > >>>>> workflow
> > > > > > > > > > > > > >>>>>>>> of how this feature will be used in case
> of
> > > > > partition
> > > > > > > > > change
> > > > > > > > > > > > and so
> > > > > > > > > > > > > >>>>> on.
> > > > > > > > > > > > > >>>>>>> Yeah, that might help.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> best,
> > > > > > > > > > > > > >>>>>>> Colin
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>>> Thanks,
> > > > > > > > > > > > > >>>>>>>> Dong
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin
> > > McCabe<
> > > > > > > > > > > > cmccabe@apache.org>
> > > > > > > > > > > > > >>>>>>>> wrote:
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> I updated the KIP with the ideas we've
> been
> > > > > > > discussing.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> best,
> > > > > > > > > > > > > >>>>>>>>> Colin
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin
> McCabe
> > > > > wrote:
> > > > > > > > > > > > > >>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan
> Filipiak
> > > > > wrote:
> > > > > > > > > > > > > >>>>>>>>>>> Hi Colin, thank you  for this KIP, it
> can
> > > > > become a
> > > > > > > > > really
> > > > > > > > > > > > > >>>>> useful
> > > > > > > > > > > > > >>>>>>> thing.
> > > > > > > > > > > > > >>>>>>>>>>> I just scanned through the discussion
> so
> > > far
> > > > > and
> > > > > > > > wanted
> > > > > > > > > > to
> > > > > > > > > > > > > >>>>> start a
> > > > > > > > > > > > > >>>>>>>>>>> thread to make as decision about
> keeping
> > > the
> > > > > > > > > > > > > >>>>>>>>>>> cache with the Connection / Session or
> > > having
> > > > > some
> > > > > > > > sort
> > > > > > > > > > of
> > > > > > > > > > > > UUID
> > > > > > > > > > > > > >>>>>>> indN
> > > > > > > > > > > > > >>>>>>>>> exed
> > > > > > > > > > > > > >>>>>>>>>>> global Map.
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> Sorry if that has been settled already
> and
> > > I
> > > > > missed
> > > > > > > > it.
> > > > > > > > > > In
> > > > > > > > > > > > this
> > > > > > > > > > > > > >>>>>>> case
> > > > > > > > > > > > > >>>>>>>>>>> could anyone point me to the
> discussion?
> > > > > > > > > > > > > >>>>>>>>>> Hi Jan,
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> I don't think anyone has discussed the
> idea
> > > of
> > > > > tying
> > > > > > > > the
> > > > > > > > > > > cache
> > > > > > > > > > > > > >>>>> to an
> > > > > > > > > > > > > >>>>>>>>>> individual TCP session yet.  I agree
> that
> > > > since
> > > > > the
> > > > > > > > > cache
> > > > > > > > > > is
> > > > > > > > > > > > > >>>>>>> intended to
> > > > > > > > > > > > > >>>>>>>>>> be used only by a single follower or
> client,
> > > > > it's an
> > > > > > > > > > > > interesting
> > > > > > > > > > > > > >>>>>>> thing
> > > > > > > > > > > > > >>>>>>>>>> to think about.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> I guess the obvious disadvantage is that
> > > > > whenever
> > > > > > > your
> > > > > > > > > TCP
> > > > > > > > > > > > > >>>>> session
> > > > > > > > > > > > > >>>>>>>>>> drops, you have to make a full fetch
> request
> > > > > rather
> > > > > > > > than
> > > > > > > > > > an
> > > > > > > > > > > > > >>>>>>> incremental
> > > > > > > > > > > > > >>>>>>>>>> one.  It's not clear to me how often
> this
> > > > > happens in
> > > > > > > > > > > practice
> > > > > > > > > > > > --
> > > > > > > > > > > > > >>>>> it
> > > > > > > > > > > > > >>>>>>>>>> probably depends a lot on the quality
> of the
> > > > > > > network.
> > > > > > > > > > From
> > > > > > > > > > > a
> > > > > > > > > > > > > >>>>> code
> > > > > > > > > > > > > >>>>>>>>>> perspective, it might also be a bit
> > > difficult
> > > > to
> > > > > > > > access
> > > > > > > > > > data
> > > > > > > > > > > > > >>>>>>> associated
> > > > > > > > > > > > > >>>>>>>>>> with the Session from classes like
> KafkaApis
> > > > > > > (although
> > > > > > > > > we
> > > > > > > > > > > > could
> > > > > > > > > > > > > >>>>>>> refactor
> > > > > > > > > > > > > >>>>>>>>>> it to make this easier).
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> It's also clear that even if we tie the
> > > cache
> > > > > to the
> > > > > > > > > > > session,
> > > > > > > > > > > > we
> > > > > > > > > > > > > >>>>>>> still
> > > > > > > > > > > > > >>>>>>>>>> have to have limits on the number of
> caches
> > > > > we're
> > > > > > > > > willing
> > > > > > > > > > to
> > > > > > > > > > > > > >>>>> create.
> > > > > > > > > > > > > >>>>>>>>>> And probably we should reserve some
> cache
> > > > slots
> > > > > for
> > > > > > > > each
> > > > > > > > > > > > > >>>>> follower, so
> > > > > > > > > > > > > >>>>>>>>>> that clients don't take all of them.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> Id rather see a protocol in which the
> > > client
> > > > is
> > > > > > > > hinting
> > > > > > > > > > the
> > > > > > > > > > > > > >>>>> broker
> > > > > > > > > > > > > >>>>>>>>> that,
> > > > > > > > > > > > > >>>>>>>>>>> he is going to use the feature instead
> of a
> > > > > client
> > > > > > > > > > > > > >>>>>>>>>>> realizing that the broker just offered
> the
> > > > > feature
> > > > > > > > > > > > (regardless
> > > > > > > > > > > > > >>>>> of
> > > > > > > > > > > > > >>>>>>>>>>> protocol version which should only
> indicate
> > > > > that
> > > > > > > the
> > > > > > > > > > > feature
> > > > > > > > > > > > > >>>>>>>>>>> would be usable).
> > > > > > > > > > > > > >>>>>>>>>> Hmm.  I'm not sure what you mean by
> > > "hinting."
> > > > > I do
> > > > > > > > > think
> > > > > > > > > > > > that
> > > > > > > > > > > > > >>>>> the
> > > > > > > > > > > > > >>>>>>>>>> server should have the option of not
> > > accepting
> > > > > > > > > incremental
> > > > > > > > > > > > > >>>>> requests
> > > > > > > > > > > > > >>>>>>> from
> > > > > > > > > > > > > >>>>>>>>>> specific clients, in order to save
> memory
> > > > space.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> This seems to work better with a per
> > > > > > > > > > > > > >>>>>>>>>>> connection/session attached Metadata
> than
> > > > with
> > > > > a
> > > > > > > Map
> > > > > > > > > and
> > > > > > > > > > > > could
> > > > > > > > > > > > > >>>>>>> allow
> > > > > > > > > > > > > >>>>>>>>> for
> > > > > > > > > > > > > >>>>>>>>>>> easier client implementations.
> > > > > > > > > > > > > >>>>>>>>>>> It would also make Client-side code
> easier
> > > as
> > > > > there
> > > > > > > > > > > wouldn't
> > > > > > > > > > > > > >>>>> be any
> > > > > > > > > > > > > >>>>>>>>>>> Cache-miss error Messages to handle.
> > > > > > > > > > > > > >>>>>>>>>> It is nice not to have to handle
> cache-miss
> > > > > > > > responses, I
> > > > > > > > > > > > agree.
> > > > > > > > > > > > > >>>>>>>>>> However, TCP sessions aren't exposed to
> most
> > > > of
> > > > > our
> > > > > > > > > > > > client-side
> > > > > > > > > > > > > >>>>> code.
> > > > > > > > > > > > > >>>>>>>>>> For example, when the Producer creates a
> > > > > message and
> > > > > > > > > hands
> > > > > > > > > > > it
> > > > > > > > > > > > > >>>>> off to
> > > > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > > > >>>>>>>>>> NetworkClient, the NC will transparently
> > > > > re-connect
> > > > > > > > and
> > > > > > > > > > > > re-send a
> > > > > > > > > > > > > >>>>>>>>>> message if the first send failed.  The
> > > > > higher-level
> > > > > > > > code
> > > > > > > > > > > will
> > > > > > > > > > > > > >>>>> not be
> > > > > > > > > > > > > >>>>>>>>>> informed about whether the TCP session
> was
> > > > > > > > > re-established,
> > > > > > > > > > > > > >>>>> whether an
> > > > > > > > > > > > > >>>>>>>>>> existing TCP session was used, and so
> on.
> > > So
> > > > > > > overall
> > > > > > > > I
> > > > > > > > > > > would
> > > > > > > > > > > > > >>>>> still
> > > > > > > > > > > > > >>>>>>> lean
> > > > > > > > > > > > > >>>>>>>>>> towards not coupling this to the TCP
> > > > session...
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> best,
> > > > > > > > > > > > > >>>>>>>>>> Colin
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>>     Thank you again for the KIP. And
> again,
> > > > if
> > > > > this
> > > > > > > > was
> > > > > > > > > > > > clarified
> > > > > > > > > > > > > >>>>>>> already
> > > > > > > > > > > > > >>>>>>>>>>> please drop me a hint where I could
> read
> > > > about
> > > > > it.
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> Best Jan
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe
> wrote:
> > > > > > > > > > > > > >>>>>>>>>>>> Hi all,
> > > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>>> I created a KIP to improve the
> scalability
> > > > and
> > > > > > > > latency
> > > > > > > > > > of
> > > > > > > > > > > > > >>>>>>>>> FetchRequest:
> > > > > > > > > > > > > >>>>>>>>>>>> https://cwiki.apache.org/
> > > > > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > > > > >>>>>>>>> 227%3A+Introduce+Incremental+
> > > > > > > > FetchRequests+to+Increase+
> > > > > > > > > > > > > >>>>>>>>> Partition+Scalability
> > > > > > > > > > > > > >>>>>>>>>>>> Please take a look.
> > > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>>> cheers,
> > > > > > > > > > > > > >>>>>>>>>>>> Colin
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message